利用elk日志分析系统收集history历史命令

行云流水
2022-05-26 / 0 评论 / 833 阅读 / 正在检测是否收录...

前言

上一篇文章, 利用elk系统记录分析所有服务器ssh登录信息 。本篇继续收集所有服务器的history命令历史,同时对集群做出优化。

收集准备

HISTDIR='/var/log/command.log'
# 定义Command日志的格式
export HISTTIMEFORMAT="{\"TIME\":\"%F %T\",\"HOSTNAME\":\"$HOSTNAME\",\"LI\":\"$(who -u am i 2>/dev/null| awk '{print $NF}'|sed -e 's/[()]//g')\",\"LU\":\"$(who am i|awk '{print $1}')\",\"NU\":\"${USER}\",\"CMD\":\""
# 输出日志到指定的log文件
export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]\+[0-9]\+ //"|sed "s/$/\"}/">> ${HISTDIR}'

touch /var/log/command.log && chmod 666 /var/log/command.log

filebeat

version: "3"
services:
  filebeat:
    # 容器名称
    container_name: filebeat
    # 主机名称
    hostname: wlmq-xxx-server-1
    # 镜像
    image: elastic/filebeat:8.2.0
    # 重启机制
    restart: always
    # 启动用户
    user: root
    # 持久化挂载
    volumes:
      # 映射到容器中[作为数据源]
      - /var/log/:/log/

      # 方便查看数据及日志(可不映射)
      - /data/ly-elk/filebeat/logs:/usr/share/filebeat/logs
      - /data/ly-elk/filebeat/data:/usr/share/filebeat/data

      # 映射配置文件到容器中
      - ./conf/filebeat.yml:/usr/share/filebeat/filebeat.yml
    # 使用主机网络模式
    network_mode: host

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /log/secure
  fields:
    log_type: secure

- type: log
  enabled: true
  paths:
    - /log/command.log
  fields:
    log_type: command

output.logstash:
  hosts: ["IP:5044"]
  enabled: true
  worker: 1
  compression_level: 3
  loadbalance: true

close_older: 10m
force_close_files: true

logstash

logstash:
    image: logstash:7.16.2
    container_name:  logstash
    restart: always
    depends_on:
      - es-master #logstash在elasticsearch启动之后再启动
      - es-slave1
    environment:
      - "elasticsearch.hosts=http://es-master:9200"
      - "xpack.monitoring.elasticsearch.hosts=http://es-master:9200"
      - "xpack.monitoring.enabled=true"
      - "TZ=Asia/Shanghai"
    volumes:
      - ./conf/pipelines.yml:/usr/share/logstash/config/pipelines.yml
      - ./conf/logstash/:/usr/share/logstash/pipeline
    ports:
      - "5044:5044" #设置端口
    networks:
      - elk

- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [fields][log_type] == 'secure' {
          pipeline { send_to => secure }
        } else if [fields][log_type] == 'command' {
          pipeline { send_to => command }
        }
    }

- pipeline.id: logstash-filebeat-secure
  path.config: "/usr/share/logstash/pipeline/logstash-filebeat-secure.conf"

- pipeline.id: logstash-filebeat-command
  path.config: "/usr/share/logstash/pipeline/logstash-filebeat-command.conf"

input {
  pipeline {
    address => secure
  }
}

filter {
    grok {
      match => { "message" => ".*sshd\[\d+\]: %{WORD:status} .* %{USER:username} from.*%{IP:clientip}.*" }
    }
}

output {
 if ([status] == "Accepted" or [status] == "Failed") {
  elasticsearch {
   hosts => [ "es-master:9200" ]
   index => "secure-%{+YYYY.MM.dd}"
  }
  stdout {
   codec => rubydebug
  }
 }
}

input {
  pipeline {
    address => command
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  elasticsearch {
   hosts => [ "es-master:9200" ]
   index => "command-%{+YYYY.MM.dd}"
   codec => "json"
  }
  stdout {
   codec => rubydebug
  }
}

kibana

评论 (0)

取消
只有登录/注册用户才可评论