很杂, 涉及到最近处理的一些点
一个字段, resp_time
, mapping中是string, 有需求是, 按照响应时间降序排序, 此时需要构造qsl(在search中使用), 使用该字段转换为浮点数, 降序排列
第一步, 修改es配置, 增加groovy支持
elasticsearch.yml中加入
script.engine.groovy.inline.search: on
然后, 执行rolling restart, 逐一重启集群每个节点
第二步, 构造qsl, sort
中, 增加 _script
使用groovy脚本, 将对应字段从string转成数字, 再进行排序
'sort': [{'_script': {'lang': 'groovy', 'order': 'desc', 'script': 'Float.parseFloat(doc["resp_time"].value)', 'type': 'number'}}, {'@timestamp': 'desc'} ]
附 scripting文档
fielddata-format-disabled
导致的排序失效 有个集群, 升级后, 发现 resp_time
字段的mapping是
"resp_time" : { "type" : "string", "norms" : { "enabled" : false }, "fielddata" : { "format" : "disabled" }, "fields" : { "raw" : { "type" : "string", "index" : "not_analyzed", "ignore_above" : 256 } }
注意这里的, 是因为升级es 2.0之后, 默认值变更带来的问题
"fielddata" : { "format" : "disabled" },
fielddata文档
此时, 排序的qsl将会报错, 无法按照对应要求排序
Field data loading is forbidden on resp_time
解决方案, 挺简单的, 使用 foo.raw
即可
'sort': [{'_script': {'lang': 'groovy', 'order': 'desc', 'script': 'Float.parseFloat(doc["resp_time.raw"].value)', 'type': 'number'}}, {'@timestamp': 'desc'} ]
把string类型的 resp_time
放到 aggs
中做聚合的时候.
"aggs": { "resp_time_stats": {"stats": {"script": 'Float.parseFloat(doc["resp_time.raw"].value)'}} }
此时, 会报错
{u'error': {u'failed_shards': [{u'index': u'logstash-2016.04.10', u'node': u'AvemqKN-RGKy68zJXUapBg', u'reason': {u'reason': u'scripts of type [inline], operation [aggs] and lang [groovy] are disabled', u'type': u'script_exception'}, u'shard': 0}], u'grouped': True, u'phase': u'query', u'reason': u'all shards failed', u'root_cause': [{u'reason': u'scripts of type [inline], operation [aggs] and lang [groovy] are disabled', u'type': u'script_exception'}], u'type': u'search_phase_execution_exception'}, u'status': 500}
处理, es加配置, 逐一重启
script.engine.groovy.inline.aggs: on
相关 文档
默认的一些pattern, 见 grok-patterns
grok检查在线实时编辑, https://grokdebug.herokuapp.com/
配置, 具体见 multiline文档
input { codec => multiline { patterns_dir => "./patterns" pattern => "" what => "previous" negate => true max_lines => 100 max_bytes => "50kib" } }
单位 bytes
实践中, 使用 max_bytes
, 当 what=previous + negate=true
的情况下, 即不匹配模式的, 归属前一部分, 这种情况下, 性能ok, 反之 what=next + negate=true
的情况下, 不匹配成功归属于后半部分, 此时产生的cpu消耗非常之大, 可以将一台机器跑满.
另外, 假设配置 max_bytes=1M
, 此时用户打了50M, 会给这个event打上tag multiline_codec_max_bytes_reache
, 但是, 这50M 最终还是会经logstash灌入到es里面. 即, 超了, 但是并不自动截掉
这时候, 我们可以, 使用 mutate-replace
直接替换掉
# if multiline_codec_max_lines_reached if ("multiline_codec_max_bytes_reached" in [tags]) { mutate { replace => { "message" => "Log System Warnning: multiline_codec_max_lines_reached, Your log has exceeded 50kB(51200 chars), it was blocked by log system. Please check your code to make your log info shorter and useful" "msg" => "Log System Warnning: multiline_codec_max_lines_reached, Your log has exceeded 50kB(51200 chars), it was blocked by log system. Please check your code to make your log info shorter and useful" } } }
之前提到, 升级集群后, 使用supervisord统一管理logstash进程,链接
有时, 需要上机器看看对应采集端所有logstash进程是否存在问题, 常常用到 top
命令, 所以写了个简单的脚本, 配合supervisord的脚本使用
ltop.sh
#!/bin/bash ./logstashd.sh status top -p $(./logstashd.sh status | awk '{print $4}' | awk -F',' '{print $1}' | tr '/n' ',' | sed 's/,$//g')
#!/bin/bash BASEDIR=$(dirname $0) cd $BASEDIR CURRENT_DIR=`pwd` exec >> /tmp/log/monitor.log 2>&1 echo "==============================================" date function check() { PNAME=$1 PID=$2 CPU_USE=$(ps -p $PID -o %cpu | sed -n '2p') INT_CPU_USE=$(printf "%.0f/n" $CPU_USE) echo $PNAME" - "$CPU_USE" - "$INT_CPU_USE if [ $INT_CPU_USE -gt 85 ] then echo "$PNAME cpu usage greater than 85%,do restart" ./logstashd.sh restart $PNAME fi } export -f check ./logstashd.sh status | awk '{print "-", $1, $4}' | awk -F',' '{print $1}' | xargs -n3 bash -c 'check $@'
机器节点本身有1T 硬盘, 由两块盘组成, 配置es的时候, 数据分别写到了两个盘上, 然后有一天集群状态告警了
"status" : "yellow",
查看es的日志
[2016-03-21 12:43:45,934][INFO ][cluster.routing.allocation.decider] [node_01] low disk watermark [85%] exceeded on [AvemqKN-RGKy68zJXUapBg][node_01][/data/LogNewData/xxx/nodes/0] free: 75.5gb[14.1%], replicas will not be assigned to this node
处理: 腾磁盘空间出来, es会自动检测恢复
PS: 磁盘大小要预估好
历史遗留问题, 有些节点采集发送到redis的key, 在indexer阶段并没有被消费, 导致越堆越多....
这时候, 可以通过redis查下哪些队列堆积了
bin/redis-cli -h 127.0.0.1 -p 6379 -a blueking_log --bigkeys
需要redis版本支持 bigkeys
=> This is a "new" feature beginning with 2.8
grok解析失败, 丢弃
if ("_grokparsefailure" in [tags]) { drop {} }
有时候, 需要禁止采集某些文件, 但由于 file
类型的 exclude
只能用文件名, 而没有更强大的规则, 所以只能采集进来再丢弃, 此时, 可以根据路径grok解析出关键字, 然后判断丢弃
if ([keyworod] in ["data", "not_exists"]) { drop {} }
默认情况, 有可能把所有cpu跑满, 这时候, 可以专门加下
-w, --pipeline-workers COUNT Sets the number of pipeline workers to run. (default: 24) logstash agent -f conf/xxx.conf -w 2
health.sh
#!/bin/bash curl 'http://127.0.0.1:9200/_cluster/health?pretty=true'
indices.sh
#!/bin/bash curl 'http://127.0.0.1:9200/_cat/indices?v' | sort -k 3