前言
上一篇文章, 利用elk系统记录分析所有服务器ssh登录信息 。本篇继续收集所有服务器的history命令历史,同时对集群做出优化。
收集准备
HISTDIR='/var/log/command.log'
# 定义Command日志的格式
export HISTTIMEFORMAT="{\"TIME\":\"%F %T\",\"HOSTNAME\":\"$HOSTNAME\",\"LI\":\"$(who -u am i 2>/dev/null| awk '{print $NF}'|sed -e 's/[()]//g')\",\"LU\":\"$(who am i|awk '{print $1}')\",\"NU\":\"${USER}\",\"CMD\":\""
# 输出日志到指定的log文件
export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]\+[0-9]\+ //"|sed "s/$/\"}/">> ${HISTDIR}'
touch /var/log/command.log && chmod 666 /var/log/command.log
filebeat
version: "3"
services:
filebeat:
# 容器名称
container_name: filebeat
# 主机名称
hostname: wlmq-xxx-server-1
# 镜像
image: elastic/filebeat:8.2.0
# 重启机制
restart: always
# 启动用户
user: root
# 持久化挂载
volumes:
# 映射到容器中[作为数据源]
- /var/log/:/log/
# 方便查看数据及日志(可不映射)
- /data/ly-elk/filebeat/logs:/usr/share/filebeat/logs
- /data/ly-elk/filebeat/data:/usr/share/filebeat/data
# 映射配置文件到容器中
- ./conf/filebeat.yml:/usr/share/filebeat/filebeat.yml
# 使用主机网络模式
network_mode: host
filebeat.inputs:
- type: log
enabled: true
paths:
- /log/secure
fields:
log_type: secure
- type: log
enabled: true
paths:
- /log/command.log
fields:
log_type: command
output.logstash:
hosts: ["IP:5044"]
enabled: true
worker: 1
compression_level: 3
loadbalance: true
close_older: 10m
force_close_files: true
logstash
logstash:
image: logstash:7.16.2
container_name: logstash
restart: always
depends_on:
- es-master #logstash在elasticsearch启动之后再启动
- es-slave1
environment:
- "elasticsearch.hosts=http://es-master:9200"
- "xpack.monitoring.elasticsearch.hosts=http://es-master:9200"
- "xpack.monitoring.enabled=true"
- "TZ=Asia/Shanghai"
volumes:
- ./conf/pipelines.yml:/usr/share/logstash/config/pipelines.yml
- ./conf/logstash/:/usr/share/logstash/pipeline
ports:
- "5044:5044" #设置端口
networks:
- elk
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [fields][log_type] == 'secure' {
pipeline { send_to => secure }
} else if [fields][log_type] == 'command' {
pipeline { send_to => command }
}
}
- pipeline.id: logstash-filebeat-secure
path.config: "/usr/share/logstash/pipeline/logstash-filebeat-secure.conf"
- pipeline.id: logstash-filebeat-command
path.config: "/usr/share/logstash/pipeline/logstash-filebeat-command.conf"
input {
pipeline {
address => secure
}
}
filter {
grok {
match => { "message" => ".*sshd\[\d+\]: %{WORD:status} .* %{USER:username} from.*%{IP:clientip}.*" }
}
}
output {
if ([status] == "Accepted" or [status] == "Failed") {
elasticsearch {
hosts => [ "es-master:9200" ]
index => "secure-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
}
input {
pipeline {
address => command
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
hosts => [ "es-master:9200" ]
index => "command-%{+YYYY.MM.dd}"
codec => "json"
}
stdout {
codec => rubydebug
}
}
评论 (0)