ELK 的基礎配置與建設教學
Elasticsearch 的安裝與配置
Elasticsearch 會把 log 存到 index 裏面,通常會根據時間把同一來源的 log 存進相同前綴的不同 index,例如 blog-nginx-log-2024-10-07,而「blog-nginx-log-*」則稱之爲 index pattern,我們通常會在這一組 index pattern 裏面去搜索滿足特定條件的 log。
Elasticsearch 的安裝
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.14.3-amd64.deb
sudo dpkg -i elasticsearch-8.14.3-amd64.deb
安裝完成後,請妥善保存他提供的密碼:
如上圖,被紅色塗住的部分就是超級使用者 elastic
的密碼。
當然,如果忘記,我們也可以用下面的方式來重新產生密碼:
sudo /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic
啓動 Elasticsearch 並設置開機時自動啓動:
sudo systemctl start elasticsearch
sudo systemctl enable elasticsearch
接着,我們就使用剛才產生的密碼,來發送第一個請求,來測試 Elasticsearch 是否正常運行。
curl -kX GET -u elastic:${pswd} "https://localhost:9200/"
當你得到類似下面的回應,就代表 Elasticsearch 建立成功:
{
"name" : "elasticsearch",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "XXXXXXXXXXXXXXXX",
"version" : {
"number" : "8.14.3",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "d55f984299e0e88dee72ebd8255f7ff130859ad0",
"build_date" : "2024-07-07T22:04:49.882652950Z",
"build_snapshot" : false,
"lucene_version" : "9.10.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
Elasticsearch 的配置
ILM 與 Index Template
基本上我們的需求是要維持 Log 半年的時間,因此這裏要介紹 ILM(Index Lifecycle Management) Policy。
ILM 可以幫我們刪除超過一定時間或大小的 index,例如如剛才的設定,超過半年的 index 就會被刪除,所以 blog-nginx-log-2024-04-22 將會自動被清理。
因此我們制訂一個超過 183 天的 Index 就會被刪除的 Policy:
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_ilm/policy/logs_policy" -H 'Content-Type: application/json' -d '
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_age": "30d"
}
}
},
"delete": {
"min_age": "183d",
"actions": {
"delete": {}
}
}
}
}
}
'
基本上 delete
的 min_age: 183d
很好理解,但 rollover
可能就比較陌生了。注意 URL,這個 policy 的名稱是 logs_policy
。
它是指每經過多少時間,我要創建一個新的 index,並且將之後的 log 都存入新的 index。
然而這件事情事實上也可以在 Logstash(或是其它 Log Collector)中做到,直接存入特定的 index,因此這邊加入這條其實只是針對於一些沒有特別設定的 index,例如 Kibana 的 alert log index。
然而,ILM 預設是不會套用在所有 Index 上的。 因此我們要引入一個新概念:Index Template:他可以幫助我們篩選出一些特定模板,然後套用 ILM:
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/all-index-template" -H 'Content-Type: application/json' -d '
{
"index_patterns": ["*"],
"settings": {
"index.lifecycle.name": "logs_policy"
}
}'
如果要套用 rollover 的規則,則可以使用下面的指令,設定之後它就會自動將 blog-nginx-log-alias
指向到最新的一個符合該 index pattern 的 index。也因此,他不能一次性設數個 index pattern,否則會發生混亂。
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/blog-nginx-index-template" -H 'Content-Type: application/json' -d '
{
"index_patterns": ["blog-nginx-log-*"],
"settings": {
"index.lifecycle.rollover_alias": "blog-nginx-log-alias"
}
}'
要注意的是,Elasticsearch 中的 index template 只會套用到未來新增的 index,並不會改變既有的 index,如果我們要針對既有的 index 進行 ILM 的套用,那我們需要採用下面的指令:
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/{your-index}/_settings" -H 'Content-Type: application/json' -d '
{
"index.lifecycle.name": "{policy}",
"index.lifecycle.rollover_alias": "{your-index-alias}"
}
'
Cluster
Cluster 一直是 HA 很重要的基礎,也是在大流量情況的最好選擇。
Elasticsearch 會自動調整資源分配,包括硬碟資源,這也意味着說,當一個節點硬碟快滿的時候,他會被停止儲存更多資料,而且其內部資料可能會被轉移到其它節點來減輕其負擔。
而加入了 Elasticsearch Cluster 的節點,若要退出實際操作也不難,只需要遵循幾個步驟即可安全的把資料轉移到其它節點,就可以優雅的下線。
因此可以根據需求,事後再加入新的節點。
Cluster 建立
假設 node1 的 IP 位置為 192.168.1.1
,且 node2 的 IP 位置為 192.168.1.2
。
編輯 node1 文件 /etc/elasticsearch/elasticsearch.yml
,修改以下配置:
cluster.name: test-cluster
node.name: node-1
discovery.seed_hosts: ["192.168.1.1", "192.168.1.2"]
cluster.initial_master_nodes: ["node-1", "node-2"]
transport.host: 0.0.0.0
編輯 node2 文件 /etc/elasticsearch/elasticsearch.yml
,修改以下配置:
cluster.name: test-cluster
node.name: node-2
discovery.seed_hosts: ["192.168.1.1", "192.168.1.2"]
cluster.initial_master_nodes: ["node-1", "node-2"]
transport.host: 0.0.0.0
其中的 Discovery Seed Hosts 代表全部的節點,而 Initial Master Nodes 代表 Cluster 在初始化的時候 Master 要從哪些節點中選出來(因此盡量不要選一個之後可能會退出的節點)。
最後開啓 Elasticsearch 服務即可。
注意:當 Elasticsearch 服務一被開啓,他就會建立一座 Cluster,因此假設在完成建立、加入操作之前,不要開啓 Elasticsearch 服務。 注意:你需要確保每台 Elasticsearch 主機信任彼此的憑證,并且透過以下指令,來設置 Elasticsearch 讀取憑證的密碼:
/usr/share/elasticsearch/bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password
/usr/share/elasticsearch/bin/elasticsearch-keystore add xpack.security.http.ssl.keystore.secure_password
注意:你也可以透過以下指令移除 secure_password
,來 hardcode 憑證密碼進 config 檔:
/usr/share/elasticsearch/bin/elasticsearch-keystore remove xpack.security.transport.ssl.keystore.secure_password
/usr/share/elasticsearch/bin/elasticsearch-keystore remove xpack.security.http.ssl.keystore.secure_password
Logstash 與 Elasticsearch
Logstash 的安裝與配置
sudo wget https://artifacts.elastic.co/downloads/logstash/logstash-8.14.1-amd64.deb
sudo dpkg -i logstash-8.14.1-amd64.deb
然後我們新建一個 /etc/logstash/conf.d/my.conf
,並且輸入:
input {
syslog {
port => 10514
host => "0.0.0.0"
}
}
output {
elasticsearch {
hosts => ["https://{elastic-ip}:9200"]
data_stream => true
data_stream_type => "logs"
data_stream_dataset => "blog-nginx"
data_stream_namespace => "dev"
ssl_enabled => true
ssl_certificate_verification => false
user => "elastic"
password => "{your-password-for-elastic}"
}
stdout { codec => rubydebug }
}
它會監聽 514 port,在 nginx 的機器上面配置 rsyslog,將其收集到的 nginx log 發送給 logstash server 即可。
那安裝、調整完,就要啓動加開機啓用 Logstash:
sudo systemctl start logstash
sudo systemctl enable logstash
我們可以用以下指令測試:
logger -n 127.0.0.1 -P 10514 "This is a test log message"
之後查看 Elasticsearch 的狀況:
$ curl -kX GET -u elastic:${pswd} "https://localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size dataset.size
yellow open .ds-logs-blog-nginx-dev-2024.10.07-000001 W4RTfT3_RBWW1Q5UhAeUyg 1 1 10 0 27.7kb 27.7kb 27.7kb
發現真的進去了。
基本上你要針對每一個服務,都新增一個 conf 檔。
接着如果需要高效能,我們還需要調整他 Worker 數和記憶體大小。
在 /etc/logstash/logstash.yml
中可以修改 pipeline.workers
,它代表會開多少個 threads 來處理請求。
而在 /etc/logstash/jvm.options
中,我們可以修改他的 jvm 記憶體大小設定。
設定 Nginx 主機
sudo vim /etc/rsyslog.d/50-nginx.conf
輸入以下:
module(load="imtcp")
module(load="imudp")
input(type="imtcp" port="514")
input(type="imudp" port="514")
template(name="LogstashFormat" type="string" string="<%PRI%>%TIMESTAMP:::date-rfc3339% %HOSTNAME% %syslogtag%%msg%\n")
if ($programname == 'nginx') then {
action(type="omfwd" target="logstash-server-ip" port="10514" protocol="tcp")
}
接着,你還需要讓 Nginx 主動把 log 送進入 rsyslog,這部分可以參考 Nginx - Logging to syslog 這篇文章。
要注意的是 Nginx 設定裏面的 tag 要和 rsyslog 設定裏面的 $programname 值相同,他 action 才會被觸發。
至此,Elasticsearch 和 Logstash 算是串接成功,也都算是粗淺的介紹完了。
Kibana 的安裝與使用
Kibana 和 Elasticsearch 是嚴格綁定的,他們的版本必須完全一致,因此上面的 Elasticsearch 使用 v8.14.3,此處 Kibana 也應該使用 v8.14.3。
Kibana 的安裝
sudo apt-get install gpg apt-transport-https -y
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install kibana=8.14.3 -y
接着編輯 /etc/kibana/kibana.yml
,找到下列選項並修改:
server.host
修改爲"0.0.0.0"
開啓並啓用 Kibana 服務:
sudo systemctl start kibana
sudo systemctl enable kibana
理論上你就可以藉由瀏覽器訪問 http://{kibana-ip}:5601
來查看 Kibana 頁面,此時有兩種設定方式,一種是手動,另一種是由 Elasticsearch 產生 Enrollment token。
Enrollment token
在 Elasticsearch 的主機上下這條指令:
sudo /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana --url "https://{elastic-external-ip}:9200"
{external-ip}
意味着你不能填寫 127.0.0.1
之類的 loopback interface IP 位置,而應該填寫如果 Kibana 要訪問,他應該要使用的那個 IP 位置。
將輸出的 enrollment token 輸入到 Kibana 後,他會要求你輸入驗證碼,如圖:
此時你需要回到 Kibana 那臺主機,下:sudo systemctl status kibana
就會得到這樣子的字串:Oct 08 02:52:41 kibana kibana[1863]: Your verification code is: XXX XXX
,把驗證碼輸入進去
之後,就可以使用 elastic 的帳號密碼登入了。
Kibana Encryption Keys For Secure Saved Objects
如果 Kibana 要保存敏感資料,例如 Alert 行爲,那麼爲了保證安全性,Kibana 會要求加密。
因此你必須設定加密用的金鑰,才能保證一些功能運作正常。
爲了產生金鑰,可以使用下面的指令:
sudo /usr/share/kibana/bin/kibana-encryption-keys generate
他會產生這樣子的結果:
我們將它們直接貼到 /etc/kibana/kibana.yml
的最底下去即可。
Kibana 的操作
可視化圖標
進入主頁後,點擊中間最右方的 Analytics:
點擊 Create data view:
填寫名稱、Index pattern:
選擇你想要的模式,基本上這裏我選擇 Dashboard。
然後就可以創建 Dashboard,再創建 Visualization 了。
這是創建 Visualization 的畫面:
最簡單的情況,我們只需要把滑鼠放在左邊:
點擊那個 $\oplus$ 符號,一張簡易的圖表就做好了:
之後別忘了儲存 Visualization,再儲存 Dashboard。
重新回到 Kibana 之後,我們可以點擊左上角 Kibana 下方的下拉選單,即可查看 Dashboard:
Alert Rule 警告規則
在 Kibana 主頁,同樣地,我們點擊左上角 Kibana 下方的下拉選單,滑到最下方的 Management
,點擊其中細項 Stack Management
。
進入 Stack Management
頁面,我們可以在左邊看到 Alerts and Insights
(如果你看不到,請確認 Kibana 下方的下拉選單是關閉的),點擊其中細項 Alerts
,之後右上角有一個 Manage Ruls
的鏈接,點下去之後我們就可以新增規則了。
在新增規則時,它會先要求你選擇規則類別,此時我們選擇 Elasticsearch Query
即可:
最上方會讓你輸入規則名字和標籤:
中間讓你選擇你規則觸發條件 Query 的語法格式,我這裏選擇 DSL,也就是我平常操作 Elasticsearch 的方式:
選擇了 DSL 之後,就有四大項目要填:
最上方的 Select indices:填寫你要監控的 Index Pattern 以及該 Index Pattern 代表時間的欄位是什麼,因爲我設定 Nginx 傳輸時間的欄位叫做 @timestamp
,因此我這裏選擇 @timestamp
。
第二個要你填寫 DSL,也就是 Query 的內容,我填寫的是:
{
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-15m",
"lte": "now"
}
}
},
{
"regexp": {
"message": ".*404.*"
}
}
]
}
}
}
基本上就是篩選出 log 訊息內有 404 字樣且發生在 15 分鐘內的資料。
第三項是要你填寫觸發規則,我填寫的是:當過去五分鐘內的全部內容的總數量超過 3 的時候觸發。
第四項指的是有多少條查詢得到的記錄會被送發給 Rule Action(可以理解爲 Rule 觸發後會執行的行爲)。
那這邊有個小問題,爲什麼我規則已經寫了只要算過去 5 分鐘,我 DSL 還要特別篩選 15 分鐘呢?
按照個人理解,這應該可以減輕 Elasticsearch 和 Kibana 的負擔,因爲不需要處理那麼多不必要資訊;當然他們也有可能有對此進行優化,但我個人是保有這個習慣:永遠不會去請求不需要的資料。
接着我們可以按下下方的 Test Query 來測試 DSL 語法,他會告訴你查詢結果,如:Query matched 0 documents in the last 5m.
最後一項就是 Action,剛才稍微介紹過,就是 Rule 的條件被觸發後,Kibana 系統會採取的行爲,一個 Rule 可能會對應多個 Actions:
由於我們這邊要和 Python 腳本做搭配,因此我們選擇 Index
,它就會把 Alert 資訊寫回 Elasticsearch,基本上選擇 Index 之後,他會要我們先創建一個 Index Connector。Index Connector 就是說,要把 Alert 資料寫到哪個 Index:
我們還可以再做一些細微的調整,例如狀態變更時寄信,還是每次檢查都寄信。
最後,我們可以修改這裏的 Document to index,基本上它決定了你要寫回 Elasticsearch 的內容。具體的設定可以參考 Elastic - Index document example。
再發送數次的 404 請求之後,我們成功的在 alert-index
查詢到了 1 個 alert:
$ curl -kX GET -u elastic:${pswd} "https://localhost:9200/alert-index/_search?pretty"
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "alert-index",
"_id" : "52VRapIB9oE_vdCqLo1N",
"_score" : 1.0,
"_source" : {
"rule_id" : "d6edc773-8437-4ad2-8cf2-5245b4cc24db",
"rule_name" : "nginx-log-checker",
"alert_id" : "query matched",
"context_message" : "Document count is 8 in the last 5m in logs-blog-nginx-dev* index. Alert when greater than 3."
}
}
]
}
}
但這其實不太足以使用,稍作修改,我們將 Document to index 改成如下:
{
"@timestamp": "{{date}}",
"rule_id": "{{rule.id}}",
"rule_name": "{{rule.name}}",
"alert_id": "{{alert.id}}",
"context_message": "{{context.message}}"
}
然後回到 Elasticsearch 主機,新增一條 index template:
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/alert-index-template" -H 'Content-Type: application/json' -d '
{
"index_patterns": ["alert-*"],
"mappings": {
"dynamic_templates": [
{
"timestamps_as_date": {
"match": "@timestamp",
"mapping": {
"type": "date"
}
}
}
]
}
}'
這個 index template 的功能是將 @timestamp
欄位的型別轉換成 date,因此才允許進行時間段的查詢。
如此,新產生的 log 就可以讓我們方便的查詢了:
Kibana 串 Python Bot
在剛剛的 Alert Action 中,你可以看到 Email 選項,並且它是不可選擇的,因爲只有付費版才能使用 Email Connector,因此爲了及時得知 Alert,我們選擇了將 Alert 寫回 Elasticsearch,並且用 Python 去監控該 index。
這裏撰寫了一個簡單的 Python Bot 模板:
import requests
import json
import schedule
import time
import urllib3
from datetime import datetime
alert_index="alert-index"
elastic = "https://localhost:9200"
user = "elastic"
pswd = ""
interval = 10
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def send_alert(log):
print("ALERT: Detected 500 error log!")
print(json.dumps(log, indent=4))
def query_elasticsearch():
url = f"{elastic}/{alert_index}/_search?pretty"
headers = {
'Content-Type': 'application/json'
}
query = {
"query": {
"range": {
"@timestamp": {
"gte": f"now-{interval+2}s"
}
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
response = requests.get(
url,
headers=headers,
data=json.dumps(query),
auth=requests.auth.HTTPBasicAuth(user, pswd),
verify=False
)
if response.status_code == 200:
results = response.json()
hits = results.get('hits', {}).get('hits', [])
if hits:
print(f"Logs from the last 30 seconds ({datetime.now()}):")
for hit in hits:
log = hit["_source"]
send_alert(log)
else:
print(f"Failed to fetch logs: {response.status_code} - {response.text}")
print("Starting log query script. Press Ctrl+C to exit.")
try:
while True:
query_elasticsearch()
time.sleep(interval)
except KeyboardInterrupt:
print("Script terminated by user.")
只要修改 send_alert
,就可以將它改成發送 Email、發送 Line 或是 Telegram 訊息。
這裏以 Line Bot 爲例,我們使用 Flask 寫一個簡易 API Server:
import json
import requests
from flask import Flask, request, jsonify
ChannelAccessToken = '' # Place your access token here.
USER_IDS_FILE = 'user_ids.txt'
SECRET_TOKEN = 'abcd1234' # Place a password here.
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
return 'Hello, World!', 200
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.get_json()
for event in data['events']:
userId = event['source']['userId']
userMessage = event['message']['text']
if userMessage == SECRET_TOKEN:
print(f"Received special message from user {userId}: {userMessage}")
with open(USER_IDS_FILE, 'a') as f:
f.write(f"{userId}\n")
print(f"Recorded userId: {userId}")
return 'OK', 200
@app.route('/send_alert', methods=['POST'])
def send_alert():
data = request.get_json()
if 'message' not in data:
return jsonify({"error": "Missing 'message' in request body"}), 400
alert_message = data['message']
try:
with open(USER_IDS_FILE, 'r') as f:
user_ids = f.readlines()
user_ids = [uid.strip() for uid in user_ids if uid.strip()]
if not user_ids:
return jsonify({"error": "No user IDs found"}), 404
for userId in user_ids:
payload = {
"to": userId,
"messages": [
{
"type": "text",
"text": alert_message
}
]
}
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {ChannelAccessToken}'
}
response = requests.post('https://api.line.me/v2/bot/message/push', headers=headers, data=json.dumps(payload))
if response.status_code != 200:
print(f"Failed to send message to {userId}: {response.text}")
return jsonify({"status": "Alert sent to all users"}), 200
except FileNotFoundError:
return jsonify({"error": "User ID file not found"}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80, debug=True)
這裏的 ChannelAccessToken
需要去 Line 官方申請,申請可以參考這篇文章 iTHome - LINE Bot 帳號申請。
接着修改 send_alert
函式爲下:
line_bot_api = "http://localhost/send_alert"
def send_alert(message):
payload = {
"message": message
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(line_bot_api, headers=headers, data=json.dumps(payload))
print(f"Status Code: {response.status_code}")
print(f"Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
他的運作原理就是使用者發送特殊的祕密口令給 Line Bot,Line Bot 就會將使用者 ID 搜錄到檔案裏面;當 send_alert
被呼叫的時候,Line Bot 就會把 log 訊息發送給所有記錄在冊的使用者。
效能評測與可擴充性
測試環境
主機:
- 2 * Intel(R) Xeon(R) CPU E5-2640
- 128G 記憶體
- PVE 虛擬環境
每一臺虛擬機的配置皆如下:
- 1 core
- 8 GB RAM
- 32 GB SSD
測試時是在主機空閒時候。
測試結果
- Logstash:單台效能大約為 500 筆/sec,測試 log 爲
"This is test log entry {serial_number}"
- Elasticsearch:對於兩臺 Elasticsearch 組成的 Cluster,外加一臺 Nginx 作爲 Load Balancer,放在同一個 PVE 的虛擬橋接網路下。
- 查詢:300 次/sec,單次查詢爲 100 筆資料。
可以以此爲基礎來配置、部署你的 ELK。
前面已經提及了 Elasticsearch Cluster 的可擴充性,雖然說 Logstash 沒有必要組成 Cluster,但必要的增加 Logstash 來分散壓力也不失爲一種好的選擇,不過這都要經由 Log 量來進行分析。
未來可以做的東西
- Elasticsearch Cluster:更高的 HA、負載平衡,以及資料安全性。
- Elasticsearch RBAC:設置不同的Role,防止某一臺 logstash 被攻陷導致 Elastic 超級管理員權限被濫用。
- Elasticsearch Machine Learning:如果你買得起授權,可以研究看看,參考 Elastic Machine Learning。
- Elastic SIEM:參考 Elastic SIEM is free and open for security analysts everywhere
