Settings esSettings = Settings.builder() .put("cluster.name", clusterName) //设置ES实例的名称 .put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中 .build(); client = new PreBuiltTransportClient(esSettings);//初始化client较老版本发生了变化,此方法有几个重载方法,初始化插件等。 //此步骤添加IP,至少一个,其实一个就够了,因为添加了自动嗅探配置 client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip), esPort));
现在是在代码中配置,如果交给IOC托管,查找一下看看在哪里配置
配置详解:
自动嗅探设置生成JSON格式文档的方式&使用prepareIndex来进行存储
ES中的id可以由我们自己指定,也可以自己指定 (这是一个重载方法)
---单个文档插入
client.prepareIndex("fendo", "fendodate") .setSource(json) .get();//执行
我使用Gson将对象转json,然后添加 XContentType.JSON
client.prepareIndex(indexName, type,id).setSource(data, XContentType.JSON).get();
刚开始以为只要是json就可以,但是没有解决
client.prepareIndex(indexName, type,id).setSource(data).get();
但是总是报错,就看了一下源码
if(source.length % 2 != 0) { throw new IllegalArgumentException("The number of object passed must be even but was [" + source.length + "]"); } else if(source.length == 2 && source[0] instanceof BytesReference && source[1] instanceof Boolean) { throw new IllegalArgumentException("you are using the removed method for source with bytes and unsafe flag, the unsafe flag was removed, please just use source(BytesReference)"); }
这个没有看懂(不知道为什么要这么检查),有种硬编码的赶脚
最终还是使用了XContentBuilder来构建
这里面的i++和++i用的很好---记一下
for(int i = 0; i < source.length; ++i) { builder.field(source[i++].toString(), source[i]); }
BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("index","type","id") .setSource("json", XContentType.JSON)); BulkResponse bulkResponse = bulkRequest.get();
可以写一个循环,将我们的数据批量插入,一般来说这个操作都是在全量同步数据使用。
但是
这个批量操作并不是原子性的,也没有事务,个人感觉减少了通信次数。
根据index type id 查询数据
GetResponse response = client.prepareGet("index", "type", "id").get();
但是一般都是根据条件来查询 使用到id精确查询的不多
解析response
Map<String, Object> source = response.getSource();//默认返回的是一个map Map<String, Object> sourceAsMap = response.getSourceAsMap();// 指定返回格式 String sourceAsString = response.getSourceAsString();//尝试看看能不能转换成Java Bean boolean sourceEmpty = response.isSourceEmpty();//判空操作
返回map
可以使用这个工具类实现map<String,Object> 和 Java Bean的转换
Java Bean与Map<String,Object>相互转换删除操作 还是利用的 index type id 这种场景真的少,
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get(); String s = response.toString(); RestStatus status = response.status(); int status1 = status.getStatus();//查看删除是否成功
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件 .source("persons") //index(索引名) .get(); //执行
获取失败信息
List<ScrollableHitSource.SearchFailure> searchFailures = response.getSearchFailures(); for (ScrollableHitSource.SearchFailure searchFailure : searchFailures) { Throwable reason = searchFailure.getReason();//获取失败的异常信息 String index = searchFailure.getIndex(); Integer shardId = searchFailure.getShardId(); String nodeId = searchFailure.getNodeId(); }
获取删除的文档数量
long deleted = response.getDeleted();
异步操作,监听结果回调
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) //查询 .source("persons") //index(索引名) .execute(new ActionListener<BulkByScrollResponse>() { //回调监听 @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); //删除文档的数量 } @Override public void onFailure(Exception e) { // Handle the exception } });
利用CountDownLatch实现主线程接收数据
两种方式
UpdateRequest prepareUpdate()
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
一般也不是根据id更新,当然如果在数据库表中存放了es中的id也是可以这样操作的。~~~~