Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种。相比来说transportClient API效率更高,transportClient 是通过Elasticsearch内部RPC的形式进行请求的,连接可以是一个长连接,相当于是把客户端的请求当成
Elasticsearch 集群的一个节点。但是从Elasticsearch 7 后就会移除transportClient 。主要原因是transportClient 难以向下兼容版本。
本文中所有的讲解和操作都是基于jdk 1.8 和elasticsearch 6.2.4版本。
备注:本文参考了很多Elasticsearch 的官方文档以及部l网络资料做的综合整理。
High Client 基于 Low Client, 主要目的是暴露一些 API,这些 API 可以接受请求对象为参数,返回响应对象,而对请求和响应细节的处理都是由 client 自动完成的。
API 在调用时都可以是同步或者异步两种形式
同步 API 会导致阻塞,一直等待数据返回
异步 API 在命名上会加上 async 后缀,需要有一个 listener 作为参数,等这个请求返回结果或者发生错误时,这个 listener 就会被调用,listener主要是解决自动回调的问题,有点像安卓 开发里面的listener监听回调。
Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>
client初始化:
RestHighLevelClient 实例依赖 REST low-level client builder
public class ElasticSearchClient { private String[] hostsAndPorts; public ElasticSearchClient(String[] hostsAndPorts) { this.hostsAndPorts = hostsAndPorts; } public RestHighLevelClient getClient() { RestHighLevelClient client = null; List<HttpHost> httpHosts = new ArrayList<HttpHost>(); if (hostsAndPorts.length > 0) { for (String hostsAndPort : hostsAndPorts) { String[] hp = hostsAndPort.split(":"); httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http")); } client = new RestHighLevelClient( RestClient.builder(httpHosts.toArray(new HttpHost[0]))); } else { client = new RestHighLevelClient( RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"))); } return client; } }
1、Index API:
IndexRequest:
封装好的参考方法:
private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) { IndexRequest indexRequest = null; if (null == index || null == indexType) { throw new ElasticsearchException("index or indexType must not be null"); } if (null == docId) { indexRequest = new IndexRequest(index, indexType); } else { indexRequest = new IndexRequest(index, indexType, docId); } return indexRequest; } /** * 同步执行索引 * * @param index * @param indexType * @param docId * @param dataMap * @throws IOException */ public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap)); } /** * 异步执行 * * @param index * @param indexType * @param docId * @param dataMap * @param indexResponseActionListener * @throws IOException */ public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException { getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener); }
IndexRequest request = new IndexRequest( "posts", // 索引 Index "doc", // Type "1"); // 文档 Document Id String jsonString = "{" + "/"user/":/"kimchy/"," + "/"postDate/":/"2013-01-30/"," + "/"message/":/"trying out Elasticsearch/"" + "}"; request.source(jsonString, XContentType.JSON); // 文档源格式为 json string
Document Source
document source 可以是下面的格式
Map类型的输入:
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap); // 会自动将 Map 转换为 JSON 格式
XContentBuilder : 这是 Document Source 提供的帮助类,专门用来产生 json 格式的数据:
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.timeField("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);
Object 键对:
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");
同步索引:
IndexResponse indexResponse = client.index(request);
异步索引:异步执行函数需要添加 listener, 而对于 index 而言,这个 listener 的类型就是 ActionListener
client.indexAsync(request, listener);
异步方法执行后会立刻返回,在索引操作执行完成后,ActionListener 就会被回调:
执行成功,调用 onResponse 函数
执行失败,调用 onFailure 函数
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { } @Override public void onFailure(Exception e) { } };
IndexResponse:
不管是同步回调还是异步回调,如果调用成功,都会返回 IndexRespose 对象。
String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { // 文档第一次创建 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { // 文档之前已存在,当前是重写 } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片数量少于总分片数量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 处理潜在的失败信息 } }
在索引时有版本冲突的话,会抛出 ElasticsearchException
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); // 这里是文档版本号 try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 冲突了 } }
如果将 opType 设置为 create, 而且如果索引的文档与已存在的文档在 index, type 和 id 上均相同,也会抛出冲突异常。
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } }
2、GET API
GET 请求
每个 GET 请求都必须需传入下面 3 个参数:
GetRequest getRequest = new GetRequest( "posts", "doc", "1");
可选参数
下面的参数都是可选的, 里面的选项并不完整,如要获取完整的属性,请参考 官方文档
不获取源数据,默认是获取的
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
配置返回数据中包含指定字段
String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
配置返回数据中排除指定字段
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"message"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
实时 默认为 true
request.realtime(false);
版本
request.version(2);
版本类型
request.versionType(VersionType.EXTERNAL);
同步执行
GetResponse getResponse = client.get(getRequest);
异步执行
此部分与 index 相似, 只有一点不同, 返回类型为 GetResponse
Get Response
返回的 GetResponse 对象包含要请求的文档数据(包含元数据和字段)
String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); // string 形式 Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 字节形式 } else { // 没有发现请求的文档 }
在请求中如果包含特定的文档版本,如果与已存在的文档版本不匹配, 就会出现冲突
try { GetRequest request = new GetRequest("posts", "doc", "1").version(2); GetResponse getResponse = client.get(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { // 版本冲突 } }
封装好的参考方法: /** * @param index * @param indexType * @param docId * @param includes 返回需要包含的字段,可以传入空 * @param excludes 返回需要不包含的字段,可以传入为空 * @param excludes version * @param excludes versionType * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } GetRequest getRequest = new GetRequest(index, indexType, docId); FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); getRequest.realtime(true); if (null != version) { getRequest.version(version); } if (null != versionType) { getRequest.versionType(versionType); } return getClient().get(getRequest.fetchSourceContext(fetchSourceContext)); } /** * @param index * @param indexType * @param docId * @param includes * @param excludes * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException { return getRequest(index, indexType, docId, includes, excludes, null, null); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); return getClient().get(getRequest); }
如果文档存在 Exists API 返回 true, 否则返回 fasle。
GetRequest 用法和 Get API 差不多,两个对象的可选参数是相同的。由于 exists() 方法只返回 true 或者 false, 建议将获取 _source 以及任何存储字段的值关闭,尽量使请求轻量级。
GetRequest getRequest = new GetRequest( "posts", // Index "doc", // Type "1"); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields("_none_"); // 禁止存储任何字段
同步请求
boolean exists = client.exists(getRequest);
异步请求
异步请求与 Index API 相似,此处不赘述,只粘贴代码。如需详细了解,请参阅官方地址
ActionListener<Boolean> listener = new ActionListener<Boolean>() { @Override public void onResponse(Boolean exists) { } @Override public void onFailure(Exception e) { } }; client.existsAsync(getRequest, listener);
封装的参考方法:
/** * @param index * @param indexType * @param docId * @return * @throws IOException */ public Boolean existDoc(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); return getClient().exists(getRequest); }
Delete Request
DeleteRequest 必须传入下面参数
DeleteRequest request = new DeleteRequest( "posts", // index "doc", // doc "1"); // document id
可选参数
超时时间
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
版本
request.version(2);
版本类型
request.versionType(VersionType.EXTERNAL);
同步执行 DeleteResponse deleteResponse = client.delete(request);
异步执行
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() { @Override public void onResponse(DeleteResponse deleteResponse) { } @Override public void onFailure(Exception e) { } }; client.deleteAsync(request, listener); Delete Response
DeleteResponse 可以检索执行操作的信息
String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功分片数目小于总分片 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 处理潜在失败 } }
也可以来检查文档是否存在
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse = client.delete(request); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { // 文档不存在 } 版本冲突时也会抛出 `ElasticsearchException try { DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse = client.delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { // 版本冲突 } }
封装好的参考方法:
/** * @param index * @param indexType * @param docId * @param timeValue * @param refreshPolicy * @param version * @param versionType * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId); if (null != timeValue) { deleteRequest.timeout(timeValue); } if (null != refreshPolicy) { deleteRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { deleteRequest.version(version); } if (null != versionType) { deleteRequest.versionType(versionType); } return getClient().delete(deleteRequest); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException { return deleteDoc(index, indexType, docId, null, null, null, null); }
Update Request
UpdateRequest 的必需参数如下
UpdateRequest request = new UpdateRequest( "posts", // Index "doc", // 类型 "1"); // 文档 Id
部分文档更新:
在更新部分文档时,已存在文档与部分文档会合并。
部分文档可以有以下形式:
JSON 格式:
UpdateRequest request = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "/"updated/":/"2017-01-01/"," + "/"reason/":/"daily update/"" + "}"; request.doc(jsonString, XContentType.JSON);
Map 格式:
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(jsonMap);
XContentBuilder 对象:
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.timeField("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder); Object key-pairs UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");
Upserts:如果文档不存在,可以使用 upserts 方法将文档以新文档的方式创建。
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");
upserts 方法支持的文档格式与 update 方法相同。
可选参数:
超时时间
request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
冲突后重试次数
request.retryOnConflict(3);
获取数据源,默认是开启的
request.fetchSource(true);
包括特定字段
String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes));
排除特定字段
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; request.fetchSource(new FetchSourceContext(true, includes, excludes));
指定版本
request.version(2);
禁用 noop detection
request.scriptedUpsert(true);
设置如果更新的文档不存在,就必须要创建一个
request.docAsUpsert(true);
同步执行
UpdateResponse updateResponse = client.update(request);
异步执行
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse) { } @Override public void onFailure(Exception e) { } }; client.updateAsync(request, listener);
String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { // 文档已创建 } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { // 文档已更新 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { // 文档已删除 } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { // 文档不受更新的影响 }
如果在 UpdateRequest 中使能了获取源数据,响应中则包含了更新后的源文档信息。
GetResult result = updateResponse.getGetResult(); if (result.isExists()) { String sourceAsString = result.sourceAsString(); // 将获取的文档以 string 格式输出 Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式输出 byte[] sourceAsBytes = result.source(); // 字节形式 } else { // 默认情况下,不会返回文档源数据 }
也可以检测是否分片失败
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片数量小于总分片数量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 得到分片失败的原因 } }
如果在执行 UpdateRequest 时,文档不存在,响应中会包含 404 状态码,而且会抛出 ElasticsearchException 。
UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist") .doc("field", "value"); try { UpdateResponse updateResponse = client.update(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { // 处理文档不存在的情况 } }
如果版本冲突,也会抛出 ElasticsearchException
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 处理版本冲突的情况 } }
封装好的参考方法:
/** * @param index * @param indexType * @param docId * @param dataMap * @param timeValue * @param refreshPolicy * @param version * @param versionType * @param docAsUpsert * @param includes * @param excludes * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId); updateRequest.doc(dataMap); if (null != timeValue) { updateRequest.timeout(timeValue); } if (null != refreshPolicy) { updateRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { updateRequest.version(version); } if (null != versionType) { updateRequest.versionType(versionType); } updateRequest.docAsUpsert(docAsUpsert); //冲突时重试的次数 updateRequest.retryOnConflict(3); if (null == includes && null == excludes) { return getClient().update(updateRequest); } else { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes))); } } /** * 更新时不存在就插入 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null); } /** * 存在才更新 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null); }
批量请求
使用 BulkRequest 可以在一次请求中执行多个索引,更新和删除的操作。
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); // 将第一个 IndexRequest 添加到批量请求中 request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); // 第二个 request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz")); // 第三个
在同一个 BulkRequest 也可以添加不同的操作类型
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz"));
可选参数
超时时间
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
设置在批量操作前必须有几个分片处于激活状态
request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL); // 全部分片都处于激活状态 request.waitForActiveShards(ActiveShardCount.DEFAULT); // 默认 request.waitForActiveShards(ActiveShardCount.ONE); // 一个
同步请求
BulkResponse bulkResponse = client.bulk(request);
异步请求
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }; client.bulkAsync(request, listener);
Bulk Response
BulkResponse 中包含执行操作后的信息,并允许对每个操作结果迭代。
for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍历所有的操作结果 DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 获取操作结果的响应,可以是 IndexResponse, UpdateResponse or DeleteResponse, 它们都可以惭怍是 DocWriteResponse 实例 if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作后的响应结果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作后的响应结果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作后的响应结果 } }
此外,批量响应还有一个非常便捷的方法来检测是否有一个或多个操作失败
if (bulkResponse.hasFailures()) { // 表示至少有一个操作失败 }
在这种情况下,我们要遍历所有的操作结果,检查是否是失败的操作,并获取对应的失败信息
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { // 检测给定的操作是否失败 BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 获取失败信息 } }
Bulk Processor
BulkProcessor 是为了简化 Bulk API 的操作提供的一个工具类,要执行操作,就需要下面组件
RestHighLevelClient 用来执行 BulkRequest 并获取 BulkResponse`
BulkProcessor.Listener 对 BulkRequest 执行前后以及失败时监听
BulkProcessor.builder 方法用来构建一个新的BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // 在每个 BulkRequest 执行前调用 } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 在每个 BulkRequest 执行后调用 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失败时调用 } };
BulkProcessor.Builder 提供了多个方法来配置 BulkProcessor
如何来处理请求的执行。
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener); builder.setBulkActions(500); // 指定多少操作时,就会刷新一次 builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); // 指定多大容量,就会刷新一次 builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允许并发执行的数量 builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); BulkProcessor 创建后,各种请求就可以添加进去: IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
BulkProcessor 执行时,会对每个 bulk request调用 BulkProcessor.Listener , listener 提供了下面方法来访问 BulkRequest 和 BulkResponse:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); // 在执行前获取操作的数量 logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { // 执行后查看响应中是否包含失败的操作 logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); // 请求失败时打印信息 } };
请求添加到 BulkProcessor , 它的实例可以使用下面两种方法关闭请求。
awaitClose() 在请求返回后或等待一定时间关闭
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
close() 立刻关闭
bulkProcessor.close();
两个方法都会在关闭前对处理器中的请求进行刷新,并避免新的请求添加进去。
封装好的参考方法:
/** * 批量操作 * * @param indexBeanList * @param timeValue * @param refreshPolicy * @return * @throws IOException */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException { BulkRequest bulkRequest = getBulkRequest(indexBeanList); if (null != timeValue) { bulkRequest.timeout(timeValue); } if (null != refreshPolicy) { bulkRequest.setRefreshPolicy(refreshPolicy); } return getClient().bulk(bulkRequest); } private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) { BulkRequest bulkRequest = new BulkRequest(); indexBeanList.forEach(indexBean -> { if ("1".equals(indexBean.getOperateType())) { bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType())); } else if ("2".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("update action docId must not be null"); } bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else if ("3".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("delete action docId must not be null"); } bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else { throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support"); } }); return bulkRequest; } /** * 批量操作 * * @param indexBeanList * @return */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException { return bulkRequest(indexBeanList, null, null); } /** * 批量异步操作 * * @param indexBeanList * @param bulkResponseActionListener */ public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) { getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener); }
Java High Level REST Client 支持下面的 Search API:
Search API
Search Request
searchRequest 用来完成和搜索文档,聚合,建议等相关的任何操作同时也提供了各种方式来完成对查询结果的高亮操作。
最基本的查询操作如下
SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery()); // 添加 match_all 查询 searchRequest.source(searchSourceBuilder); // 将 SearchSourceBuilder 添加到 SeachRequest 中
可选参数
SearchRequest searchRequest = new SearchRequest("posts"); // 设置搜索的 index searchRequest.types("doc"); // 设置搜索的 type
除了配置 index 和 type 外,还有一些其他的可选参数
searchRequest.routing("routing"); // 设置 routing 参数 searchRequest.preference("_local"); // 配置搜索时偏爱使用本地分片,默认是使用随机分片
什么是 routing 参数?
当索引一个文档的时候,文档会被存储在一个主分片上。在存储时一般都会有多个主分片。Elasticsearch 如何知道一个文档应该放置在哪个分片呢?这个过程是根据下面的这个公式来决定的:
shard = hash(routing) % number_of_primary_shards
routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值
number_of_primary_shards 是主分片数量
所有的文档 API 都接受一个叫做 routing 的路由参数,通过这个参数我们可以自定义文档到分片的映射。一个自定义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被存储到同一个分片中。
使用 SearchSourceBuilder
对搜索行为的配置可以使用 SearchSourceBuilder 来完成,来看一个实例
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 默认配置 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); // 设置搜索,可以是任何类型的 QueryBuilder sourceBuilder.from(0); // 起始 index sourceBuilder.size(5); // 大小 size sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); // 设置搜索的超时时间
设置完成后,就可以添加到 SearchRequest 中。
SearchRequest searchRequest = new SearchRequest(); searchRequest.source(sourceBuilder);
构建查询条件
查询请求是通过使用 QueryBuilder 对象来完成的,并且支持 Query DSL。
DSL (domain-specific language) 领域特定语言,是指专注于某个应用程序领域的计算机语言。
可以使用构造函数来创建 QueryBuilder
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
QueryBuilder 创建后,就可以调用方法来配置它的查询选项:
matchQueryBuilder.fuzziness(Fuzziness.AUTO); // 模糊查询 matchQueryBuilder.prefixLength(3); // 前缀查询的长度 matchQueryBuilder.maxExpansions(10); // max expansion 选项,用来控制模糊查询
也可以使用QueryBuilders 工具类来创建 QueryBuilder 对象。这个类提供了函数式编程风格的各种方法用来快速创建 QueryBuilder 对象。
QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy") .fuzziness(Fuzziness.AUTO) .prefixLength(3) .maxExpansions(10);
fuzzy-matching 拼写错误时的匹配:
好的全文检索不应该是完全相同的限定逻辑,相反,可以扩大范围来包括可能的匹配,从而根据相关性得分将更好的匹配放在前面。
例如,搜索 quick brown fox 时会匹配一个包含 fast brown foxes 的文档
不论什么方式创建的 QueryBuilder ,最后都需要添加到 `SearchSourceBuilder 中
searchSourceBuilder.query(matchQueryBuilder);
构建查询 文档中提供了一个丰富的查询列表,里面包含各种查询对应的QueryBuilder 对象以及QueryBuilder helper 方法,大家可以去参考。
关于构建查询的内容会在下篇文章中讲解,敬请期待。
指定排序
SearchSourceBuilder 允许添加一个或多个SortBuilder 实例。这里包含 4 种特殊的实现, (Field-, Score-, GeoDistance- 和 ScriptSortBuilder)
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); // 根据分数 _score 降序排列 (默认行为) sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC)); // 根据 id 降序排列
过滤数据源
默认情况下,查询请求会返回文档的内容 _source ,当然我们也可以配置它。例如,禁止对 _source 的获取
sourceBuilder.fetchSource(false);
也可以使用通配符模式以更细的粒度包含或排除特定的字段:
String[] includeFields = new String[] {"title", "user", "innerObject.*"}; String[] excludeFields = new String[] {"_type"}; sourceBuilder.fetchSource(includeFields, excludeFields);
高亮请求
可以通过在 SearchSourceBuilder 上设置 HighlightBuilder 完成对结果的高亮,而且可以配置不同的字段具有不同的高亮行为。
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); HighlightBuilder highlightBuilder = new HighlightBuilder(); HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); // title 字段高亮 highlightTitle.highlighterType("unified"); // 配置高亮类型 highlightBuilder.field(highlightTitle); // 添加到 builder HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user"); highlightBuilder.field(highlightUser); searchSourceBuilder.highlighter(highlightBuilder);
聚合请求
要实现聚合请求分两步
创建合适的 `AggregationBuilder
作为参数配置在 `SearchSourceBuilder 上
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company") .field("company.keyword"); aggregation.subAggregation(AggregationBuilders.avg("average_age") .field("age")); searchSourceBuilder.aggregation(aggregation);
建议请求 Requesting Suggestions
SuggestionBuilder 实现类是由 SuggestBuilders 工厂类来创建的。
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy"); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); searchSourceBuilder.suggest(suggestBuilder);
对请求和聚合分析
分析 API 可用来对一个特定的查询操作中的请求和聚合进行分析,此时要将SearchSourceBuilder 的 profile标志位设置为 true
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.profile(true);
只要 SearchRequest 执行完成,对应的 SearchResponse 响应中就会包含 分析结果
同步执行
同步执行是阻塞式的,只有结果返回后才能继续执行。
SearchResponse searchResponse = client.search(searchRequest);
异步执行
异步执行使用的是 listener 对结果进行处理。
ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse searchResponse) { // 查询成功 } @Override public void onFailure(Exception e) { // 查询失败 } };
查询执行完成后,会返回 SearchResponse 对象,并在对象中包含查询执行的细节和符合条件的文档集合。
归纳一下, SerchResponse 包含的信息如下
请求本身的信息,如 HTTP 状态码,执行时间,或者请求是否超时
RestStatus status = searchResponse.status(); // HTTP 状态码 TimeValue took = searchResponse.getTook(); // 查询占用的时间 Boolean terminatedEarly = searchResponse.isTerminatedEarly(); // 是否由于 SearchSourceBuilder 中设置 terminateAfter 而过早终止 boolean timedOut = searchResponse.isTimedOut(); // 是否超时
查询影响的分片数量的统计信息,成功和失败的分片
int totalShards = searchResponse.getTotalShards(); int successfulShards = searchResponse.getSuccessfulShards(); int failedShards = searchResponse.getFailedShards(); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { // failures should be handled here }
检索 SearchHits
要访问返回的文档,首先要在响应中获取其中的 SearchHits
SearchHits hits = searchResponse.getHits();
SearchHits 中包含了所有命中的全局信息,如查询命中的数量或者最大分值:
long totalHits = hits.getTotalHits(); float maxScore = hits.getMaxScore();
查询的结果嵌套在 SearchHits 中,可以通过遍历循环获取
SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { // do something with the SearchHit }
SearchHit 提供了如 index , type, docId 和每个命中查询的分数
String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore();
而且,还可以获取到文档的源数据,以 JSON-String 形式或者 key-value map 对的形式。在 map 中,字段可以是普通类型,或者是列表类型,嵌套对象。
String sourceAsString = hit.getSourceAsString(); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); String documentTitle = (String) sourceAsMap.get("title"); List<Object> users = (List<Object>) sourceAsMap.get("user"); Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
Search API 查询关系
上面的 QueryBuilder , SearchSourceBuilder 和 SearchRequest 之间都是嵌套关系, 可以参考下图:
什么是全文查询?
像使用 match 或者 query_string 这样的高层查询都属于全文查询,
查询 日期(date) 或整数(integer) 字段,会将查询字符串分别作为日期或整数对待。
查询一个( not_analyzed )未分析的精确值字符串字段,会将整个查询字符串作为单个词项对待。
查询一个( analyzed )已分析的全文字段,会先将查询字符串传递到一个合适的分析器,然后生成一个供查询的词项列表
组成了词项列表,后面就会对每个词项逐一执行底层查询,将查询结果合并,并且为每个文档生成最终的相关度评分。
Match
match 查询的单个词的步骤是什么?
检查字段类型,查看字段是 analyzed, not_analyzed
分析查询字符串,如果只有一个单词项, match 查询在执行时就会是单个底层的 term 查询
查找匹配的文档,会在倒排索引中查找匹配文档,然后获取一组包含该项的文档
为每个文档评分
构建 Match 查询
match 查询可以接受 text/numeric/dates 格式的参数,分析,并构建一个查询。
GET /_search { "query": { "match" : { "message" : "this is a test" } } }
上面的实例中 message 是一个字段名。
对应的 QueryBuilder class : MatchQueryBuilder
具体方法 : QueryBuilders.matchQuery()
Search Query | QueryBuilder Class | Method in QueryBuilders |
---|---|---|
Match | MatchQueryBuilder | QueryBuilders.matchQuery() |
Match Phrase | MatchPhraseQueryBuilder | QueryBuilders.matchPhraseQuery() |
Match Phrase Prefix | MatchPhrasePrefixQueryBuilder | QueryBuilders.matchPhrasePrefixQuery() |
Multi Match | MultiMatchQueryBuilder | QueryBuilders.multiMatchQuery() |
Common Terms | CommonTermsQueryBuilder | QueryBuilders.commonTermsQuery() |
Query String | QueryStringQueryBuilder | QueryBuilders.queryStringQuery() |
Simple Query String | SimpleQueryStringBuilder | QueryBuilders.simpleQueryStringQuery() |
基于词项的查询
这种类型的查询不需要分析,它们是对单个词项操作,只是在倒排索引中查找准确的词项(精确匹配)并且使用 TF/IDF 算法为每个包含词项的文档计算相关度评分 _score。
Term
term 查询可用作精确值匹配,精确值的类型则可以是数字,时间,布尔类型,或者是那些 not_analyzed 的字符串。
对应的 QueryBuilder class 是TermQueryBuilder
具体方法是 QueryBuilders.termQuery()
Terms
terms 查询允许指定多个值进行匹配。如果这个字段包含了指定值中的任何一个值,就表示该文档满足条件。
对应的 QueryBuilder class 是 TermsQueryBuilder
具体方法是 QueryBuilders.termsQuery()
Wildcard
wildcard 通配符查询是一种底层基于词的查询,它允许指定匹配的正则表达式。而且它使用的是标准的 shell 通配符查询:
? 匹配任意字符
* 匹配 0 个或多个字符
wildcard 需要扫描倒排索引中的词列表才能找到所有匹配的词,然后依次获取每个词相关的文档 ID。
由于通配符和正则表达式只能在查询时才能完成,因此查询效率会比较低,在需要高性能的场合,应当谨慎使用。
对应的 QueryBuilder class 是 WildcardQueryBuilder
具体方法是 QueryBuilders.wildcardQuery()
Search Query | QueryBuilder Class | Method in QueryBuilders |
---|---|---|
Term | TermQueryBuilder | QueryBuilders.termQuery() |
Terms | TermsQueryBuilder | QueryBuilders.termsQuery() |
Range | RangeQueryBuilder | QueryBuilders.rangeQuery() |
Exists | ExistsQueryBuilder | QueryBuilders.existsQuery() |
Prefix | PrefixQueryBuilder | QueryBuilders.prefixQuery() |
Wildcard | WildcardQueryBuilder | QueryBuilders.wildcardQuery() |
Regexp | RegexpQueryBuilder | QueryBuilders.regexpQuery() |
Fuzzy | FuzzyQueryBuilder | QueryBuilders.fuzzyQuery() |
Type | TypeQueryBuilder | QueryBuilders.typeQuery() |
Ids | IdsQueryBuilder | QueryBuilders.idsQuery() |
复合查询
什么是复合查询?
复合查询会将其他的复合查询或者叶查询包裹起来,以嵌套的形式展示和执行,得到的结果也是对各个子查询结果和分数的合并。可以分为下面几种:
经常用在使用 filter 的场合,所有匹配的文档分数都是一个不变的常量
可以将多个叶查询和组合查询再组合起来,可接受的参数如下
must : 文档必须匹配这些条件才能被包含进来
must_not 文档必须不匹配才能被包含进来
should 如果满足其中的任何语句,都会增加分数;即使不满足,也没有影响
filter 以过滤模式进行,不评分,但是必须匹配
dis_max query
叫做分离最大化查询,它会将任何与查询匹配的文档都作为结果返回,但是只是将其中最佳匹配的评分作为最终的评分返回。
允许为每个与主查询匹配的文档应用一个函数,可用来改变甚至替换原始的评分
用来控制(提高或降低)复合查询中子查询的权重。
Search Query | QueryBuilder Class | Method in QueryBuilders |
---|---|---|
Constant Score | ConstantScoreQueryBuilder | QueryBuilders.constantScoreQuery() |
Bool | BoolQueryBuilder | QueryBuilders.boolQuery() |
Dis Max | DisMaxQueryBuilder | QueryBuilders.disMaxQuery() |
Function Score | FunctionScoreQueryBuilder | QueryBuilders.functionScoreQuery() |
Boosting | BoostingQueryBuilder | QueryBuilders.boostingQuery() |
特殊查询
Wrapper Query
这里比较重要的一个是 Wrapper Query,是说可以接受任何其他 base64 编码的字符串作为子查询。
主要应用场合就是在 Rest High-Level REST client 中接受 json 字符串作为参数。比如使用 gson 等 json 库将要查询的语句拼接好,直接塞到 Wrapper Query 中查询就可以了,非常方便。
Wrapper Query 对应的 QueryBuilder class 是WrapperQueryBuilder
具体方法是 QueryBuilders.wrapperQuery()
public class IndexBean { //index name private String index; //index type private String indexType; //index doc id private String docId; // 1 IndexRequest 2 UpdateRequest 3 DeleteRequest private String operateType; public String getOperateType() { return operateType; } public void setOperateType(String operateType) { this.operateType = operateType; } public String getIndex() { return index; } public void setIndex(String index) { this.index = index; } public String getIndexType() { return indexType; } public void setIndexType(String indexType) { this.indexType = indexType; } public String getDocId() { return docId; } public void setDocId(String docId) { this.docId = docId; } }
/** * 自定义的es异常类 */ public class ElasticsearchException extends RuntimeException { public ElasticsearchException(String s, Exception e) { super(s, e); } public ElasticsearchException(String s){ super(s); } }
import org.apache.http.HttpHost; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * es操作 * */ public class ElasticSearchClient { private String[] hostsAndPorts; public ElasticSearchClient(String[] hostsAndPorts) { this.hostsAndPorts = hostsAndPorts; } public RestHighLevelClient getClient() { RestHighLevelClient client = null; List<HttpHost> httpHosts = new ArrayList<HttpHost>(); if (hostsAndPorts.length > 0) { for (String hostsAndPort : hostsAndPorts) { String[] hp = hostsAndPort.split(":"); httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http")); } client = new RestHighLevelClient( RestClient.builder(httpHosts.toArray(new HttpHost[0]))); } else { client = new RestHighLevelClient( RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"))); } return client; } private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) { IndexRequest indexRequest = null; if (null == index || null == indexType) { throw new ElasticsearchException("index or indexType must not be null"); } if (null == docId) { indexRequest = new IndexRequest(index, indexType); } else { indexRequest = new IndexRequest(index, indexType, docId); } return indexRequest; } /** * 同步执行索引 * * @param index * @param indexType * @param docId * @param dataMap * @throws IOException */ public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap)); } /** * 异步执行 * * @param index * @param indexType * @param docId * @param dataMap * @param indexResponseActionListener * @throws IOException */ public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException { getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener); } /** * @param index * @param indexType * @param docId * @param includes 返回需要包含的字段,可以传入空 * @param excludes 返回需要不包含的字段,可以传入为空 * @param excludes version * @param excludes versionType * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } GetRequest getRequest = new GetRequest(index, indexType, docId); FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); getRequest.realtime(true); if (null != version) { getRequest.version(version); } if (null != versionType) { getRequest.versionType(versionType); } return getClient().get(getRequest.fetchSourceContext(fetchSourceContext)); } /** * @param index * @param indexType * @param docId * @param includes * @param excludes * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException { return getRequest(index, indexType, docId, includes, excludes, null, null); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public GetResponse getRequest(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); return getClient().get(getRequest); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public Boolean existDoc(String index, String indexType, String docId) throws IOException { GetRequest getRequest = new GetRequest(index, indexType, docId); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); return getClient().exists(getRequest); } /** * @param index * @param indexType * @param docId * @param timeValue * @param refreshPolicy * @param version * @param versionType * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId); if (null != timeValue) { deleteRequest.timeout(timeValue); } if (null != refreshPolicy) { deleteRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { deleteRequest.version(version); } if (null != versionType) { deleteRequest.versionType(versionType); } return getClient().delete(deleteRequest); } /** * @param index * @param indexType * @param docId * @return * @throws IOException */ public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException { return deleteDoc(index, indexType, docId, null, null, null, null); } /** * @param index * @param indexType * @param docId * @param dataMap * @param timeValue * @param refreshPolicy * @param version * @param versionType * @param docAsUpsert * @param includes * @param excludes * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId); updateRequest.doc(dataMap); if (null != timeValue) { updateRequest.timeout(timeValue); } if (null != refreshPolicy) { updateRequest.setRefreshPolicy(refreshPolicy); } if (null != version) { updateRequest.version(version); } if (null != versionType) { updateRequest.versionType(versionType); } updateRequest.docAsUpsert(docAsUpsert); //冲突时重试的次数 updateRequest.retryOnConflict(3); if (null == includes && null == excludes) { return getClient().update(updateRequest); } else { if (null == includes || includes.length == 0) { includes = Strings.EMPTY_ARRAY; } if (null == excludes || excludes.length == 0) { excludes = Strings.EMPTY_ARRAY; } return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes))); } } /** * 更新时不存在就插入 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null); } /** * 存在才更新 * * @param index * @param indexType * @param docId * @param dataMap * @return * @throws IOException */ public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException { return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null); } /** * 批量操作 * * @param indexBeanList * @param timeValue * @param refreshPolicy * @return * @throws IOException */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException { BulkRequest bulkRequest = getBulkRequest(indexBeanList); if (null != timeValue) { bulkRequest.timeout(timeValue); } if (null != refreshPolicy) { bulkRequest.setRefreshPolicy(refreshPolicy); } return getClient().bulk(bulkRequest); } private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) { BulkRequest bulkRequest = new BulkRequest(); indexBeanList.forEach(indexBean -> { if ("1".equals(indexBean.getOperateType())) { bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType())); } else if ("2".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("update action docId must not be null"); } bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else if ("3".equals(indexBean.getOperateType())) { if ((null != indexBean.getDocId())) { throw new ElasticsearchException("delete action docId must not be null"); } bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId())); } else { throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support"); } }); return bulkRequest; } /** * 批量操作 * * @param indexBeanList * @return */ public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException { return bulkRequest(indexBeanList, null, null); } /** * 批量异步操作 * * @param indexBeanList * @param bulkResponseActionListener */ public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) { getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener); } private SearchRequest getSearchRequest(String index, String indexType) { SearchRequest searchRequest; if (null == index) { throw new ElasticsearchException("index name must not be null"); } if (null != indexType) { searchRequest = new SearchRequest(index, indexType); } else { searchRequest = new SearchRequest(index); } return searchRequest; } /** * @param index * @param indexType * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType) throws IOException { return getClient().search(getSearchRequest(index, indexType)); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder) throws IOException { return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null))); } private SearchSourceBuilder getSearchSourceBuilder(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, String sortField, SortBuilder sortBuilder, Boolean fetchSource) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); if (null != termQueryBuilder) { searchSourceBuilder.query(termQueryBuilder); } searchSourceBuilder.from(from); searchSourceBuilder.size(size); if (null != sortField) { searchSourceBuilder.sort(sortField); } if (null != sortBuilder) { searchSourceBuilder.sort(sortBuilder); } //设置超时时间 searchSourceBuilder.timeout(new TimeValue(120, TimeUnit.SECONDS)); if (null != fetchSource) { searchSourceBuilder.fetchSource(fetchSource); } return searchSourceBuilder; } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @param sortField * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @param sortField * @param fetchSource * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, fetchSource).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param matchQueryBuilder * @param sortBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, sortBuilder, null).query(matchQueryBuilder))); } /** * 支持排序 * * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortField * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortBuilder * @param fetchSource 开关 * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder, Boolean fetchSource) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, fetchSource).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortBuilder * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder))); } /** * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortField * @param fetchSource * @return * @throws IOException */ public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, fetchSource).query(matchQueryBuilder))); } /** * 异步操作 * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortBuilder * @param listener * @throws IOException */ public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder,ActionListener<SearchResponse> listener) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder)),listener); } /** * 异步操作 * @param index * @param indexType * @param from * @param size * @param termQueryBuilder * @param matchQueryBuilder * @param sortField * @param listener * @throws IOException */ public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField,ActionListener<SearchResponse> listener) throws IOException { if (null == matchQueryBuilder) { throw new ElasticsearchException("matchQueryBuilder is null"); } getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder)),listener); } }
二、transportClient API
未完待续,近期继续整理