ELK 的基礎配置與建設教學

Elasticsearch 的安裝與配置

Elasticsearch 會把 log 存到 index 裏面,通常會根據時間把同一來源的 log 存進相同前綴的不同 index,例如 blog-nginx-log-2024-10-07,而「blog-nginx-log-*」則稱之爲 index pattern,我們通常會在這一組 index pattern 裏面去搜索滿足特定條件的 log。

Elasticsearch 的安裝

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.14.3-amd64.deb
sudo dpkg -i elasticsearch-8.14.3-amd64.deb

安裝完成後,請妥善保存他提供的密碼:

如上圖,被紅色塗住的部分就是超級使用者 elastic 的密碼。

當然,如果忘記,我們也可以用下面的方式來重新產生密碼:

sudo /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic

啓動 Elasticsearch 並設置開機時自動啓動:

sudo systemctl start elasticsearch
sudo systemctl enable elasticsearch

接着,我們就使用剛才產生的密碼,來發送第一個請求,來測試 Elasticsearch 是否正常運行。

curl -kX GET -u elastic:${pswd} "https://localhost:9200/"

當你得到類似下面的回應,就代表 Elasticsearch 建立成功:

{
  "name" : "elasticsearch",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "XXXXXXXXXXXXXXXX",
  "version" : {
    "number" : "8.14.3",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "d55f984299e0e88dee72ebd8255f7ff130859ad0",
    "build_date" : "2024-07-07T22:04:49.882652950Z",
    "build_snapshot" : false,
    "lucene_version" : "9.10.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

Elasticsearch 的配置

ILM 與 Index Template

基本上我們的需求是要維持 Log 半年的時間,因此這裏要介紹 ILM(Index Lifecycle Management) Policy。

ILM 可以幫我們刪除超過一定時間或大小的 index,例如如剛才的設定,超過半年的 index 就會被刪除,所以 blog-nginx-log-2024-04-22 將會自動被清理。

因此我們制訂一個超過 183 天的 Index 就會被刪除的 Policy:

curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_ilm/policy/logs_policy" -H 'Content-Type: application/json' -d '
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_age": "30d"
          }
        }
      },
      "delete": {
        "min_age": "183d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}
'

基本上 deletemin_age: 183d 很好理解,但 rollover 可能就比較陌生了。注意 URL,這個 policy 的名稱是 logs_policy

它是指每經過多少時間,我要創建一個新的 index,並且將之後的 log 都存入新的 index。

然而這件事情事實上也可以在 Logstash(或是其它 Log Collector)中做到,直接存入特定的 index,因此這邊加入這條其實只是針對於一些沒有特別設定的 index,例如 Kibana 的 alert log index。

然而,ILM 預設是不會套用在所有 Index 上的。 因此我們要引入一個新概念:Index Template:他可以幫助我們篩選出一些特定模板,然後套用 ILM:

curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/all-index-template" -H 'Content-Type: application/json' -d '
{
  "index_patterns": ["*"],
  "settings": {
    "index.lifecycle.name": "logs_policy"
  }
}'

如果要套用 rollover 的規則,則可以使用下面的指令,設定之後它就會自動將 blog-nginx-log-alias 指向到最新的一個符合該 index pattern 的 index。也因此,他不能一次性設數個 index pattern,否則會發生混亂。

curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/blog-nginx-index-template" -H 'Content-Type: application/json' -d '
{
  "index_patterns": ["blog-nginx-log-*"],
  "settings": {
    "index.lifecycle.rollover_alias": "blog-nginx-log-alias"
  }
}'

要注意的是,Elasticsearch 中的 index template 只會套用到未來新增的 index,並不會改變既有的 index,如果我們要針對既有的 index 進行 ILM 的套用,那我們需要採用下面的指令:

curl -kX PUT -u elastic:${pswd}  "https://localhost:9200/{your-index}/_settings" -H 'Content-Type: application/json' -d '
{
  "index.lifecycle.name": "{policy}",
  "index.lifecycle.rollover_alias": "{your-index-alias}"
}
'
Cluster

Cluster 一直是 HA 很重要的基礎,也是在大流量情況的最好選擇。

Elasticsearch 會自動調整資源分配,包括硬碟資源,這也意味着說,當一個節點硬碟快滿的時候,他會被停止儲存更多資料,而且其內部資料可能會被轉移到其它節點來減輕其負擔。

而加入了 Elasticsearch Cluster 的節點,若要退出實際操作也不難,只需要遵循幾個步驟即可安全的把資料轉移到其它節點,就可以優雅的下線。

因此可以根據需求,事後再加入新的節點。

Cluster 建立

假設 node1 的 IP 位置為 192.168.1.1,且 node2 的 IP 位置為 192.168.1.2

編輯 node1 文件 /etc/elasticsearch/elasticsearch.yml,修改以下配置:

cluster.name: test-cluster
node.name: node-1
discovery.seed_hosts: ["192.168.1.1", "192.168.1.2"]
cluster.initial_master_nodes: ["node-1", "node-2"]
transport.host: 0.0.0.0

編輯 node2 文件 /etc/elasticsearch/elasticsearch.yml,修改以下配置:

cluster.name: test-cluster
node.name: node-2
discovery.seed_hosts: ["192.168.1.1", "192.168.1.2"]
cluster.initial_master_nodes: ["node-1", "node-2"]
transport.host: 0.0.0.0

其中的 Discovery Seed Hosts 代表全部的節點,而 Initial Master Nodes 代表 Cluster 在初始化的時候 Master 要從哪些節點中選出來(因此盡量不要選一個之後可能會退出的節點)。

最後開啓 Elasticsearch 服務即可。

注意:當 Elasticsearch 服務一被開啓,他就會建立一座 Cluster,因此假設在完成建立、加入操作之前,不要開啓 Elasticsearch 服務。 注意:你需要確保每台 Elasticsearch 主機信任彼此的憑證,并且透過以下指令,來設置 Elasticsearch 讀取憑證的密碼:

/usr/share/elasticsearch/bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password
/usr/share/elasticsearch/bin/elasticsearch-keystore add xpack.security.http.ssl.keystore.secure_password

注意:你也可以透過以下指令移除 secure_password,來 hardcode 憑證密碼進 config 檔:

/usr/share/elasticsearch/bin/elasticsearch-keystore remove xpack.security.transport.ssl.keystore.secure_password
/usr/share/elasticsearch/bin/elasticsearch-keystore remove xpack.security.http.ssl.keystore.secure_password

Logstash 與 Elasticsearch

Logstash 的安裝與配置
sudo wget https://artifacts.elastic.co/downloads/logstash/logstash-8.14.1-amd64.deb
sudo dpkg -i logstash-8.14.1-amd64.deb

然後我們新建一個 /etc/logstash/conf.d/my.conf,並且輸入:

input {
  syslog {
    port => 10514
    host => "0.0.0.0"
  }
}

output {
  elasticsearch {
    hosts => ["https://{elastic-ip}:9200"]
    data_stream => true
    data_stream_type => "logs"
    data_stream_dataset => "blog-nginx"
    data_stream_namespace => "dev"
    ssl_enabled => true
    ssl_certificate_verification => false
    user => "elastic"
    password => "{your-password-for-elastic}"

  }
  stdout { codec => rubydebug }
}

它會監聽 514 port,在 nginx 的機器上面配置 rsyslog,將其收集到的 nginx log 發送給 logstash server 即可。

那安裝、調整完,就要啓動加開機啓用 Logstash:

sudo systemctl start logstash
sudo systemctl enable logstash

我們可以用以下指令測試:

logger -n 127.0.0.1 -P 10514 "This is a test log message"

之後查看 Elasticsearch 的狀況:

$ curl -kX GET -u elastic:${pswd} "https://localhost:9200/_cat/indices?v"
health status index                                      uuid                   pri rep docs.count docs.deleted store.size pri.store.size dataset.size
yellow open   .ds-logs-blog-nginx-dev-2024.10.07-000001  W4RTfT3_RBWW1Q5UhAeUyg   1   1         10            0     27.7kb         27.7kb       27.7kb

發現真的進去了。

基本上你要針對每一個服務,都新增一個 conf 檔。

接着如果需要高效能,我們還需要調整他 Worker 數和記憶體大小。

/etc/logstash/logstash.yml 中可以修改 pipeline.workers,它代表會開多少個 threads 來處理請求。

而在 /etc/logstash/jvm.options 中,我們可以修改他的 jvm 記憶體大小設定。

設定 Nginx 主機
sudo vim /etc/rsyslog.d/50-nginx.conf

輸入以下:

module(load="imtcp")
module(load="imudp")
input(type="imtcp" port="514")
input(type="imudp" port="514")

template(name="LogstashFormat" type="string" string="<%PRI%>%TIMESTAMP:::date-rfc3339% %HOSTNAME% %syslogtag%%msg%\n")

if ($programname == 'nginx') then {
  action(type="omfwd" target="logstash-server-ip" port="10514" protocol="tcp")
}

接着,你還需要讓 Nginx 主動把 log 送進入 rsyslog,這部分可以參考 Nginx - Logging to syslog 這篇文章。

要注意的是 Nginx 設定裏面的 tag 要和 rsyslog 設定裏面的 $programname 值相同,他 action 才會被觸發。

至此,Elasticsearch 和 Logstash 算是串接成功,也都算是粗淺的介紹完了。

Kibana 的安裝與使用

Kibana 和 Elasticsearch 是嚴格綁定的,他們的版本必須完全一致,因此上面的 Elasticsearch 使用 v8.14.3,此處 Kibana 也應該使用 v8.14.3。

Kibana 的安裝

sudo apt-get install gpg apt-transport-https -y
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install kibana=8.14.3 -y

接着編輯 /etc/kibana/kibana.yml,找到下列選項並修改:

開啓並啓用 Kibana 服務:

sudo systemctl start kibana
sudo systemctl enable kibana

理論上你就可以藉由瀏覽器訪問 http://{kibana-ip}:5601 來查看 Kibana 頁面,此時有兩種設定方式,一種是手動,另一種是由 Elasticsearch 產生 Enrollment token。

Enrollment token

在 Elasticsearch 的主機上下這條指令:

 sudo /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana --url "https://{elastic-external-ip}:9200"

{external-ip} 意味着你不能填寫 127.0.0.1 之類的 loopback interface IP 位置,而應該填寫如果 Kibana 要訪問,他應該要使用的那個 IP 位置。

將輸出的 enrollment token 輸入到 Kibana 後,他會要求你輸入驗證碼,如圖:

此時你需要回到 Kibana 那臺主機,下:sudo systemctl status kibana

就會得到這樣子的字串:Oct 08 02:52:41 kibana kibana[1863]: Your verification code is: XXX XXX,把驗證碼輸入進去

之後,就可以使用 elastic 的帳號密碼登入了。

Kibana Encryption Keys For Secure Saved Objects

如果 Kibana 要保存敏感資料,例如 Alert 行爲,那麼爲了保證安全性,Kibana 會要求加密。

因此你必須設定加密用的金鑰,才能保證一些功能運作正常。

爲了產生金鑰,可以使用下面的指令:

sudo /usr/share/kibana/bin/kibana-encryption-keys generate

他會產生這樣子的結果:

我們將它們直接貼到 /etc/kibana/kibana.yml 的最底下去即可。

Kibana 的操作

可視化圖標

進入主頁後,點擊中間最右方的 Analytics:

點擊 Create data view:

填寫名稱、Index pattern:

選擇你想要的模式,基本上這裏我選擇 Dashboard。

然後就可以創建 Dashboard,再創建 Visualization 了。

這是創建 Visualization 的畫面:

最簡單的情況,我們只需要把滑鼠放在左邊:

點擊那個 $\oplus$ 符號,一張簡易的圖表就做好了:

之後別忘了儲存 Visualization,再儲存 Dashboard。

重新回到 Kibana 之後,我們可以點擊左上角 Kibana 下方的下拉選單,即可查看 Dashboard:

Alert Rule 警告規則

在 Kibana 主頁,同樣地,我們點擊左上角 Kibana 下方的下拉選單,滑到最下方的 Management,點擊其中細項 Stack Management

進入 Stack Management 頁面,我們可以在左邊看到 Alerts and Insights(如果你看不到,請確認 Kibana 下方的下拉選單是關閉的),點擊其中細項 Alerts,之後右上角有一個 Manage Ruls 的鏈接,點下去之後我們就可以新增規則了。

在新增規則時,它會先要求你選擇規則類別,此時我們選擇 Elasticsearch Query 即可:

最上方會讓你輸入規則名字和標籤:

中間讓你選擇你規則觸發條件 Query 的語法格式,我這裏選擇 DSL,也就是我平常操作 Elasticsearch 的方式:

選擇了 DSL 之後,就有四大項目要填:

最上方的 Select indices:填寫你要監控的 Index Pattern 以及該 Index Pattern 代表時間的欄位是什麼,因爲我設定 Nginx 傳輸時間的欄位叫做 @timestamp,因此我這裏選擇 @timestamp

第二個要你填寫 DSL,也就是 Query 的內容,我填寫的是:

{
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-15m",
              "lte": "now"
            }
          }
        },
        {
          "regexp": {
            "message": ".*404.*"
          }
        }
      ]
    }
  }
}

基本上就是篩選出 log 訊息內有 404 字樣且發生在 15 分鐘內的資料。

第三項是要你填寫觸發規則,我填寫的是:當過去五分鐘內的全部內容的總數量超過 3 的時候觸發。

第四項指的是有多少條查詢得到的記錄會被送發給 Rule Action(可以理解爲 Rule 觸發後會執行的行爲)。

那這邊有個小問題,爲什麼我規則已經寫了只要算過去 5 分鐘,我 DSL 還要特別篩選 15 分鐘呢?

按照個人理解,這應該可以減輕 Elasticsearch 和 Kibana 的負擔,因爲不需要處理那麼多不必要資訊;當然他們也有可能有對此進行優化,但我個人是保有這個習慣:永遠不會去請求不需要的資料。

接着我們可以按下下方的 Test Query 來測試 DSL 語法,他會告訴你查詢結果,如:Query matched 0 documents in the last 5m.

最後一項就是 Action,剛才稍微介紹過,就是 Rule 的條件被觸發後,Kibana 系統會採取的行爲,一個 Rule 可能會對應多個 Actions:

由於我們這邊要和 Python 腳本做搭配,因此我們選擇 Index,它就會把 Alert 資訊寫回 Elasticsearch,基本上選擇 Index 之後,他會要我們先創建一個 Index Connector。Index Connector 就是說,要把 Alert 資料寫到哪個 Index:

我們還可以再做一些細微的調整,例如狀態變更時寄信,還是每次檢查都寄信。

最後,我們可以修改這裏的 Document to index,基本上它決定了你要寫回 Elasticsearch 的內容。具體的設定可以參考 Elastic - Index document example

再發送數次的 404 請求之後,我們成功的在 alert-index 查詢到了 1 個 alert:

$ curl -kX GET -u elastic:${pswd} "https://localhost:9200/alert-index/_search?pretty"     
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "alert-index",
        "_id" : "52VRapIB9oE_vdCqLo1N",
        "_score" : 1.0,
        "_source" : {
          "rule_id" : "d6edc773-8437-4ad2-8cf2-5245b4cc24db",
          "rule_name" : "nginx-log-checker",
          "alert_id" : "query matched",
          "context_message" : "Document count is 8 in the last 5m in logs-blog-nginx-dev* index. Alert when greater than 3."
        }
      }
    ]
  }
}

但這其實不太足以使用,稍作修改,我們將 Document to index 改成如下:

{
  "@timestamp": "{{date}}",
  "rule_id": "{{rule.id}}",
  "rule_name": "{{rule.name}}",
  "alert_id": "{{alert.id}}",
  "context_message": "{{context.message}}"
}

然後回到 Elasticsearch 主機,新增一條 index template:

curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/alert-index-template" -H 'Content-Type: application/json' -d '
{
  "index_patterns": ["alert-*"],
  "mappings": {
    "dynamic_templates": [
      {
        "timestamps_as_date": {
          "match": "@timestamp",
          "mapping": {
            "type": "date"
          }
        }
      }
    ]
  }
}'

這個 index template 的功能是將 @timestamp 欄位的型別轉換成 date,因此才允許進行時間段的查詢。

如此,新產生的 log 就可以讓我們方便的查詢了:

Kibana 串 Python Bot

在剛剛的 Alert Action 中,你可以看到 Email 選項,並且它是不可選擇的,因爲只有付費版才能使用 Email Connector,因此爲了及時得知 Alert,我們選擇了將 Alert 寫回 Elasticsearch,並且用 Python 去監控該 index。

這裏撰寫了一個簡單的 Python Bot 模板:

import requests
import json
import schedule
import time
import urllib3
from datetime import datetime

alert_index="alert-index"
elastic = "https://localhost:9200"
user = "elastic"
pswd = ""
interval = 10

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def send_alert(log):
    print("ALERT: Detected 500 error log!")
    print(json.dumps(log, indent=4))

def query_elasticsearch():
    url = f"{elastic}/{alert_index}/_search?pretty"
    headers = {
        'Content-Type': 'application/json'
    }
    query = {
        "query": {
            "range": {
                "@timestamp": {
                    "gte": f"now-{interval+2}s"
                }
            }
        },
        "sort": [
            {
                "@timestamp": {
                    "order": "desc"
                }
            }
        ]
    }

    response = requests.get(
            url,
            headers=headers,
            data=json.dumps(query),
            auth=requests.auth.HTTPBasicAuth(user, pswd),
            verify=False
    )

    if response.status_code == 200:
        results = response.json()
        hits = results.get('hits', {}).get('hits', [])
        if hits:
            print(f"Logs from the last 30 seconds ({datetime.now()}):")
            for hit in hits:
                log = hit["_source"]
                send_alert(log)
    else:
        print(f"Failed to fetch logs: {response.status_code} - {response.text}")


print("Starting log query script. Press Ctrl+C to exit.")

try:
    while True:
        query_elasticsearch()
        time.sleep(interval)
except KeyboardInterrupt:
    print("Script terminated by user.")

只要修改 send_alert,就可以將它改成發送 Email、發送 Line 或是 Telegram 訊息。

這裏以 Line Bot 爲例,我們使用 Flask 寫一個簡易 API Server:

import json
import requests
from flask import Flask, request, jsonify

ChannelAccessToken = '' # Place your access token here.
USER_IDS_FILE = 'user_ids.txt'
SECRET_TOKEN = 'abcd1234' # Place a password here.

app = Flask(__name__)

@app.route('/', methods=['GET', 'POST'])
def index():
    return 'Hello, World!', 200

@app.route('/webhook', methods=['POST'])
def webhook():
    data = request.get_json()

    for event in data['events']:
        userId = event['source']['userId']
        userMessage = event['message']['text']

        if userMessage == SECRET_TOKEN:
            print(f"Received special message from user {userId}: {userMessage}")
            with open(USER_IDS_FILE, 'a') as f:
                f.write(f"{userId}\n")
            print(f"Recorded userId: {userId}")

    return 'OK', 200

@app.route('/send_alert', methods=['POST'])
def send_alert():
    data = request.get_json()

    if 'message' not in data:
        return jsonify({"error": "Missing 'message' in request body"}), 400

    alert_message = data['message']

    try:
        with open(USER_IDS_FILE, 'r') as f:
            user_ids = f.readlines()
        
        user_ids = [uid.strip() for uid in user_ids if uid.strip()]

        if not user_ids:
            return jsonify({"error": "No user IDs found"}), 404

        for userId in user_ids:
            payload = {
                "to": userId,
                "messages": [
                    {
                        "type": "text",
                        "text": alert_message
                    }
                ]
            }

            headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Bearer {ChannelAccessToken}'
            }

            response = requests.post('https://api.line.me/v2/bot/message/push', headers=headers, data=json.dumps(payload))

            if response.status_code != 200:
                print(f"Failed to send message to {userId}: {response.text}")

        return jsonify({"status": "Alert sent to all users"}), 200

    except FileNotFoundError:
        return jsonify({"error": "User ID file not found"}), 404

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80, debug=True)

這裏的 ChannelAccessToken 需要去 Line 官方申請,申請可以參考這篇文章 iTHome - LINE Bot 帳號申請。 接着修改 send_alert 函式爲下:


line_bot_api = "http://localhost/send_alert"

def send_alert(message):

    payload = {
        "message": message
    }

    headers = {
        "Content-Type": "application/json"
    }

    try:
        response = requests.post(line_bot_api, headers=headers, data=json.dumps(payload))

        print(f"Status Code: {response.status_code}")
        print(f"Response: {response.text}")

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")

他的運作原理就是使用者發送特殊的祕密口令給 Line Bot,Line Bot 就會將使用者 ID 搜錄到檔案裏面;當 send_alert 被呼叫的時候,Line Bot 就會把 log 訊息發送給所有記錄在冊的使用者。

效能評測與可擴充性

測試環境

主機:

每一臺虛擬機的配置皆如下:

測試時是在主機空閒時候。

測試結果

可以以此爲基礎來配置、部署你的 ELK。

前面已經提及了 Elasticsearch Cluster 的可擴充性,雖然說 Logstash 沒有必要組成 Cluster,但必要的增加 Logstash 來分散壓力也不失爲一種好的選擇,不過這都要經由 Log 量來進行分析。

未來可以做的東西