This commit is contained in:
zhangkun9038@dingtalk.com 2025-03-10 11:20:33 +08:00
parent 6640546347
commit b9e433a769
4 changed files with 128 additions and 47 deletions

View File

@ -9,46 +9,12 @@ data:
@type http
@id input_http
port 8888
tag sardine.log # 初始 tag客户端需指定完整 tag 如 sardine.log.candle.BTC-USDT.15M
@label @main
<parse>
@type json
</parse>
</source>
<label @main>
<filter sardine.log.candle.**>
@type record_transformer
enable_ruby true
<record>
# 注入调试信息
debug_tag "${tag}"
debug_record "#{record.to_json rescue 'record is nil'}"
# 处理 tag 和 timestamp
coin_pair "${tag.split('.')[3] || 'UNKNOWN'}"
timeframe "${tag.split('.')[4] || 'UNKNOWN'}"
year "#{Time.parse(record['timestamp']).strftime('%Y') rescue 'invalid timestamp'}"
index_name "logstash_candle_${tag.split('.')[3]}_#{Time.parse(record['timestamp']).strftime('%Y') rescue 'invalid timestamp'}_${tag.split('.')[4]}"
</record>
</filter>
<match sardine.log.candle.**>
<match sardine.log.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_custom
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format false
index_name %{index_name}
id_key _id
flush_interval 5s
@log_level debug
remove_keys _id, coin_pair, timeframe, year, index_name, debug_tag, debug_record
</store>
<store>
@type elasticsearch
@id output_elasticsearch
@ -60,9 +26,9 @@ data:
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
id_key _id
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
</store>
<store>
@ -70,6 +36,40 @@ data:
@id output_stdout
</store>
</match>
<match tanya.**>
@type rewrite_tag_filter
@id rewrite_tag_for_tanya
<rule>
key @log_name
pattern ^tanya\.candle\.([^\.]+)\.([^\.]+)\.([^\.]+)$
tag candle.$1.$2.$3
</rule>
</match>
<match candle.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_tanya
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
index_name candle_${tag_parts[1]}_${tag_parts[2]}_${tag_parts[3]}
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
include_tag_key true
tag_key @log_name
</store>
<store>
@type stdout
@id output_stdout_tanya
</store>
</match>
</label>
<match **>

View File

@ -3,18 +3,37 @@
@id input_http
port 8888
tag sardine.log
@label @main
</source>
<match sardine.log>
@type elasticsearch
@id output_elasticsearch
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
<label @main>
<match sardine.log.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
flush_interval 5s # 缩短批量写入间隔
@log_level debug
id_key _id
remove_keys _id
</store>
<store>
@type stdout
@id output_stdout
</store>
</match>
</label>
<match **>
@type stdout
@id output_stdout_all
</match>

60
efk/fluentd1.conf Normal file
View File

@ -0,0 +1,60 @@
<source>
@type http
@id input_http
port 8888
@label @main
</source>
<label @main>
<match sardine.log.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
</store>
<store>
@type stdout
@id output_stdout
</store>
</match>
<match tanya.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_tanya
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format false
index_name logstash_candle_${tag_parts[1]}_${tag_parts[2]}_${tag_parts[3]}
flush_interval 5s
@log_level debug
id_key _id
remove_keys _id
</store>
<store>
@type stdout
@id output_stdout_tanya
</store>
</match>
</label>
<match **>
@type stdout
@id output_stdout_all
</match>

2
efk/myFluentd.Dockerfile Normal file
View File

@ -0,0 +1,2 @@
FROM fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-2
RUN fluent-gem install fluent-plugin-rewrite-tag-filter