首頁 > 軟體

通過 Filebeat 收集 Ubuntu 系統紀錄檔

2020-06-16 16:33:50

需求:收集 Ubuntu 系統紀錄檔,傳送給 logstash,再由 logstash 把資料傳遞給 elasticsearch,最後通過 kibana 展示紀錄檔資料。

Filebeat System Module

Filebeat Modules 可以簡化常見紀錄檔格式的收集、解析和視覺化。一個典型的模組(例如,對於 Nginx 紀錄檔)由一個或多個紀錄檔檔案(fileset)組成(對於 Nginx 來說,預設是 access.log 和 error.log)。這裡我們可以使用 Filebeat 的 System Module 完成 ubuntu 的系統紀錄檔。

下面介紹設定 System Module 的步驟(假如你已經安裝好了 Filebeat)。

啟用 System Module
Filebeat 支援的模組預設都是未啟用的,我們可以通過下面的方式啟用模組。找到 filebeat 程式,執行 moudles enable 命令:

$ sudo ./filebeat modules enable system

上面的命令啟用了 system 模組,用下面的命令可以檢視當前已經啟用的模組有哪些:

$ sudo ./filebeat modules list

把資料傳送給 logstash
設定 Filebeat 將紀錄檔行傳送到 Logstash。要做到這一點,在組態檔 filebeat.yml 中禁用 Elasticsearch 輸出,並啟用 Logstash 輸出:

#output.elasticsearch:
  #hosts: ["xxx.xxx.xxx.xxx:9200"]
output.logstash:
  hosts: ["xxx.xxx.xxx.xxx:5044"]

重新啟動 filebeat 服務

$ sudo systemctl restart filebeat.service

設定 Logstash 處理資料

要讓 logstash 接受 Filebeat System Module 傳送來的資料還是有些難度的,至少我們需要一個看上去有點複雜的設定

input {
  beats {
    port => 5064
    host => "0.0.0.0"
  }
}
filter {
  if [fileset][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:[%{POSINT:[system][auth][pid]}])?: s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:[%{POSINT:[system][auth][pid]}])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:[%{POSINT:[system][auth][pid]}])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:[%{POSINT:[system][auth][pid]}])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|n)*"
        }
        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[system][auth][ssh][geoip]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:[%{POSINT:[system][syslog][pid]}])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|n)*" }
        remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }
}
output {
  elasticsearch {
    hosts => xxx.xxx.xxx.xxx
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

處理時區問題
看到這段設定我多麼希望它能夠直接工作啊!不幸的是它並不能很好的工作,至少在我的 ubuntu 18.04 上不行。問題的核心是無論 auth.log 還是 syslog,記錄的都是本地時區的區時:

而上面的設定中把這些時間都當成 UTC 時間來處理了。搞清楚了原因,糾正起來就很容易了,在 date 外掛中新增原生的時區資訊就可以了。比如筆者所在時區為東八區,那麼就分別在兩個 date 的設定中新增下面的資訊:

timezone => "Asia/Chongqing"

讓獨立的 pipeline 處理該資料流
下面建立一個新的目錄 /etc/logstash/myconf.d,並在 /etc/logstash/myconf.d 目錄下建立 Logstash 組態檔 krtest.conf。然後在 /etc/logstash/pipelines.yml 檔案中新增新的 pipeline 設定:
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: krtest
  path.config: "/etc/logstash/myconf.d/krtest.conf"
其中 pipeline.id 為 main 的管道是預設的設定,我們新新增了 id 為 krtest 的管道並指定了對應的組態檔路徑。把上面的設定寫入到 /etc/logstash/myconf.d/krtest.conf 檔案中。然後重新啟動 logstash 服務:

$ sudo systemctl restart logstash.service

在 Kibana 中檢視紀錄檔

最後在 kibana 中新增 filebeat 開頭的 index pattern,就可以通過圖形介面檢視 ubuntu 的系統紀錄檔了:


IT145.com E-mail:sddin#qq.com