k8sConfigs/efk/fluentd-configMap2.yaml

79 lines
2.2 KiB
YAML
Raw Normal View History

2025-03-09 09:44:53 +08:00
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: efk
data:
fluent.conf: |
<source>
@type http
@id input_http
port 8888
tag sardine.log # 初始 tag客户端需指定完整 tag 如 sardine.log.candle.BTC-USDT.15M
@label @main
<parse>
@type json
</parse>
</source>
<label @main>
<filter sardine.log.candle.**>
@type record_transformer
enable_ruby true
<record>
# 注入调试信息
debug_tag "${tag}"
debug_record "#{record.to_json rescue 'record is nil'}"
# 处理 tag 和 timestamp
coin_pair "${tag.split('.')[3] || 'UNKNOWN'}"
timeframe "${tag.split('.')[4] || 'UNKNOWN'}"
year "#{Time.parse(record['timestamp']).strftime('%Y') rescue 'invalid timestamp'}"
index_name "logstash_candle_${tag.split('.')[3]}_#{Time.parse(record['timestamp']).strftime('%Y') rescue 'invalid timestamp'}_${tag.split('.')[4]}"
</record>
</filter>
<match sardine.log.candle.**>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_custom
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format false
index_name %{index_name}
id_key _id
flush_interval 5s
@log_level debug
remove_keys _id, coin_pair, timeframe, year, index_name, debug_tag, debug_record
</store>
<store>
@type elasticsearch
@id output_elasticsearch
host elasticsearch
port 9200
scheme http
user fluentd_user
password fluentd_password
logstash_format true
logstash_prefix logstash
logstash_dateformat %Y.%m.%d
id_key _id
flush_interval 5s
@log_level debug
remove_keys _id
</store>
<store>
@type stdout
@id output_stdout
</store>
</match>
</label>
<match **>
@type stdout
@id output_stdout_all
</match>