elasticsearch安装、维护以及Filebeat module编写相关的笔记,备忘。

安装、配置

CentOS

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
echo '[elasticsearch-5.x]
name=Elasticsearch repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md' > /etc/yum.repos.d/elasticsearch.repo
yum install elasticsearch # elasticsearch
yum install filebeat # filebeat

ElasticSearch

ES QueryString

http://127.0.0.1:9200/logstash-2017.10.17/testlog/_search?q=

  1. 全文检索: q=first
  2. 单字段全文检索:q=user:prismcdn
  3. 单字段精确检索:q=user:”prismcdn”
  4. 多个检索条件的组合:NOT、AND、OR、(、),如q=user:(“prismcdn” OR “hello”) AND NOT mesg:first
  5. 字段是否存在:q=_exists_:user,q=_missing_:user
  6. 通配符:单字符?,任意字符*,q=user:pri?m*
  7. 正则:q=mesg:/mes{2}ages?/,正则性能不佳,尽量避免使用
  8. 近似搜索:~标识单词可能有一两个字母写的不对,请ES按照相似度返回结果,如q=user:first~
  9. 范围搜索:对数值和时间都可以使用,[]表示端点数值包含在范围内,{}表示端点数值不包含在范围内,如q=rtt:>300,q=date:[“now-6h” TO “now”}

操作Index

  1. 删除某一个指定的索引
    # 将删除prismcdn/xnode这个module的所有索引字段
    curl -X DELETE http://es-1.prismcdn.internal:9200/_ingest/pipeline/filebeat-5.6.3-prismcdn-xnode-pipeline
    

ElasticSearch的集群化

1. 配置集群自发现

在/etc/elasticsearch/elasticsearch.yml中加入一行配置就可以让你的机器被集群发现。

discovery.zen.ping.unicast.hosts: ["172.16.1.40", "172.16.1.44"]

正常情况下,还需要配置一下最少nodes数量,用于产生master

discovery.zen.minimum_master_nodes: 2

2. 冷热数据分离

这里的热数据往往指一天以内的数据,因为Elasticsearch通常按天建立索引,所以我们会把当天的数据作为热数据,保存在SSD这样的快速存储中,以提高写入速度。而一天前的数据则迁移到普通大容量硬盘上,作为冷数据长期保存。

  1. 给SSD和SAS磁盘的节点分别设置“hot”和“stale”的tag,/etc/elasticsearch/elasticsearch.yml中添加设置项 node.attr.tag: hot stale
  2. 给index template设置默认的tag要求为hot,这里我们用filebeat,所以在filebeat的配置文件中指定,方法如下:

    /etc/filebeat/filebeat.yml setup.template.settings: index.routing.allocation.require.tag: hot

  3. 查看index template的设置是否正确
    curl http://es-1.prismcdn.internal:9200/_template python -m json.tool
  4. 如果存在问题,删除索引模板,重新创建

    curl -X DELETE http://es-1.prismcdn.internal:9200/_template/filebeat-6.0.1

  5. 已经创建的索引不会被自动修改,可以将其删掉

    curl -X DELETE http://es-1.prismcdn.internal:9200/filebeat-*

  6. 配置定时任务,将前一天的索引的tag由hot改为stale

    time=`date -d last-day “+%Y.%m.%d”` curl -X PUT -H “Content-Type:application/json”
    http://es-1.prismcdn.internal:9200/*-${time}/_settings?pretty -d’ { “index.routing.allocation.require.tag”: “stale” }’

FileBeat

以json格式推送log给ES

# /etc/filebeat/filebeat.yml

filebeat.prospectors:

- input_type: log

  paths:
    - /opt/prismcdn/erepd/erep.log

  document_type: erep
  json.keys_under_root: true
  json.add_error_key: true

- input_type: log

  paths:
    - /opt/prismcdn/report-srv/logs/trep-*.log

  document_type: trep
  json.keys_under_root: true # json格式,字段添加到root
  json.add_error_key: true

processors:
- drop_fields:
    fields: ["pp.A"] # 不发送pp.A字段

output.elasticsearch:
  hosts: ["es-1.prismcdn.internal:9200"]

处理自定义的文件数据

创建自定义的filebeat module

  1. 安装virtualenv,pip install virtualenv
  2. clone beats的工程,切换到filebeat目录,执行make create-fileset
  3. 输入module名字、fileset名字
  4. 配置config/xnode.yml,如:
    type: log
    paths:
    \{\{ range $i, $path := .paths }}
    - \{\{$path}}
    \{\{ end }}
    exclude_files: [".gz$"]
    exclude_lines: []
    
  5. 配置ingest/pipeline.json,如:
    {
       "description": "Pipeline for parsing  xnode logs",
       "processors": [{
           "grok": {
               "field": "message",
               "patterns": [
                   "\\[%{DATA:xnode.time}\\]\\[%{DATA:xnode.level}\\]<%{DATA:xnode.type}> %{GREEDYDATA:xnode.message}"
               ],
               "ignore_missing": true
           }
       }, {
           "remove": {
               "field": "message"
           }
       }, {
           "date": {
               "field": "xnode.time",
               "target_field": "@timestamp",
               "formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZ"]
           }
       }, {
           "remove": {
               "field": "xnode.time"
           }
       }],
       "on_failure" : [{
           "set" : {
               "field" : "error.message",
               "value" : ""
           }
       }]
    }
    

相对复杂的例子可以见nginx access log的

{
  "description": "Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins.",
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[
        "\"?%{IP_LIST:nginx.access.remote_ip_list} - %{DATA:nginx.access.user_name} \\[%{HTTPDATE:nginx.access.time}\\] \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\""
        ],
      "pattern_definitions": {
        "IP_LIST": "%{IP}(\"?,?\\s*%{IP})*"
      },
      "ignore_missing": true
    }
  }, {
    "split": {
      "field": "nginx.access.remote_ip_list",
      "separator": "\"?,?\\s+"
    }
  }, {
    "script": {
      "lang": "painless",
      "inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken());      int secondByte = Integer.parseInt(tok.nextToken());      if (firstByte == 10) {        return true;      }      if (firstByte == 192 && secondByte == 168) {        return true;      }      if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {        return true;      }      if (firstByte == 127) {        return true;      }      return false;    } catch (Exception e) {      return false;    }  }  def found = false;  for (def item : ctx.nginx.access.remote_ip_list) {    if (!isPrivate(item)) {      ctx.nginx.access.remote_ip = item;      found = true;      break;    }  }  if (!found) {    ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0];  }"
      }
  }, {
    "remove":{
      "field": "message"
    }
  }, {
    "rename": {
      "field": "@timestamp",
      "target_field": "read_timestamp"
    }
  }, {
    "date": {
      "field": "nginx.access.time",
      "target_field": "@timestamp",
      "formats": ["dd/MMM/YYYY:H:m:s Z"]
    }
  }, {
    "remove": {
      "field": "nginx.access.time"
    }
  }, {
    "user_agent": {
      "field": "nginx.access.agent",
      "target_field": "nginx.access.user_agent"
    }
  }, {
    "remove": {
      "field": "nginx.access.agent"
    }
  }, {
    "geoip": {
      "field": "nginx.access.remote_ip",
      "target_field": "nginx.access.geoip"
    }
  }],
  "on_failure" : [{
    "set" : {
      "field" : "error.message",
      "value" : ""
    }
  }]
}
  • 关于ingest pipeline参见https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
  • grok process见https://www.elastic.co/guide/en/elasticsearch/reference/5.6/grok-processor.html#grok-basics
  • grok内置的pattern可以查看 https://github.com/elastic/elasticsearch/blob/master/libs/grok/src/main/resources/patterns
  • 时间格式 https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match

Debug

filebeat -e -d "*" -c xxx_config.yml

https://www.elastic.co/guide/en/beats/filebeat/current/enable-filebeat-debugging.html

注意 由于ingest中的字段定义不会自动更新,可以通过删除elasticsearch相应的index来重建。

# 将删除prismcdn/xnode这个module的所有索引字段
curl -X DELETE http://es-1.prismcdn.internal:9200/_ingest/pipeline/filebeat-5.6.3-prismcdn-xnode-pipeline

Curator

安装

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
echo '
[curator-5]
name=CentOS/RHEL 7 repository for Elasticsearch Curator 5.x packages
baseurl=http://packages.elastic.co/curator/5/centos/7
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1' >> /etc/yum.repos.d/elasticsearch.repo
yum install elasticsearch-curator

配置curator.yml

# host、port和timeout
client:
  hosts:
    - es-1.prismcdn.internal
  port: 9200
  url_prefix:
  use_ssl: False
  certificate:
  client_cert:
  client_key:
  ssl_no_validate: False
  http_auth:
  timeout: 3600
  master_only: False

logging:
  loglevel: INFO
  logfile:
  logformat: default
  blacklist: ['elasticsearch', 'urllib3']

删除旧数据

# delete indices older than 14days
/usr/bin/curator_cli --config /opt/prismcdn/curator.yml delete_indices --filter_list \
'[{"filtertype":"age","source":"creation_date","direction":"older","unit":"days","unit_count":14},{"filtertype":"pattern","kind":"prefix","value":"filebeat"}]' \
--ignore_empty_list

合并数据

# merge indices older than 1 day
/usr/bin/curator_cli --config /opt/prismcdn/curator.yml forcemerge --filter_list \
'[{"filtertype":"age","source":"creation_date","direction":"older","unit":"days","unit_count":1},{"filtertype":"pattern","kind":"prefix","value":"filebeat"}]' \
--ignore_empty_list --max_num_segments 1

关闭索引

# close indices older than 7 days
/usr/bin/curator_cli --config /opt/prismcdn/curator.yml close --filter_list \
'[{"filtertype":"age","source":"creation_date","direction":"older","unit":"days","unit_count":7},{"filtertype":"pattern","kind":"prefix","value":"filebeat"}]' \
--ignore_empty_list --delete_aliases

监控

https://elkguide.elasticsearch.cn/elasticsearch/monitor/api/health.html

FAQ

  • filebeat报mapper_parsing_exceptionc 400错误

检查一下index template是否需要更新,可以删除老的index template,让filebeat自动调用创建index template步骤

  • kibana更新index fields时报FORBIDDEN/12/index read-only / allow delete (api)

在kibana的dev tools中执行

PUT .kibana/_settings
{
    "index": {
        "blocks": {
            "read_only_allow_delete": "false"
        }
    }
}

Reference

  1. http://www.jianshu.com/p/f13a6dbb84ed
  2. https://elkguide.elasticsearch.cn/