wazuh服务telegram机器人报警脚本 ossec配置文件的加入自己的脚本

  <integration>
    <name>custom-telegram_alert</name>
    <hook_url>机器人的api接口地址</hook_url>
    <api_key>chat_id:"telegarm的频道id,以-开头"</api_key>
    <level>11</level>  <!-- 设置最低警报级别,低于此级别的警报将不会发送 -->
    <alert_format>json</alert_format>
  </integration>

然后在ossec的integrations目录下加入脚本 运行缺少依赖用pip下载即可

cat integrations/custom-telegram_alert 
#!/bin/sh
# Copyright (C) 2024, Your Company.
# Created by Your Name <your.email@example.com>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2

WPYTHON_BIN="framework/python/bin/python3"

SCRIPT_PATH_NAME="$0"

DIR_NAME="$(cd $(dirname ${SCRIPT_PATH_NAME}); pwd -P)"
SCRIPT_NAME="$(basename ${SCRIPT_PATH_NAME})"

case ${DIR_NAME} in
    */active-response/bin | */wodles*)
        if [ -z "${WAZUH_PATH}" ]; then
            WAZUH_PATH="$(cd ${DIR_NAME}/../..; pwd)"
        fi

        PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
    ;;
    */bin)
        if [ -z "${WAZUH_PATH}" ]; then
            WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
        fi

        PYTHON_SCRIPT="${WAZUH_PATH}/framework/scripts/$(echo ${SCRIPT_NAME} | sed 's/\-/_/g').py"
    ;;
    */integrations)
        if [ -z "${WAZUH_PATH}" ]; then
            WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
        fi

        PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
    ;;
esac

echo "$(date) - Running ${PYTHON_SCRIPT} with arguments: $@" >> /var/ossec/logs/integrations.log
${WAZUH_PATH}/${WPYTHON_BIN} ${PYTHON_SCRIPT} "$@"

cat integrations/custom-telegram_alert.py 
#!/usr/bin/env python3
import sys
import json
import requests
import logging
import re
from dateutil import parser
from datetime import timedelta

def main():
    logging.basicConfig(filename='/var/ossec/logs/integrations.log', level=logging.INFO)
    try:
        alert_file_path = sys.argv[1]
        chat_id = sys.argv[2].split(':')[1]
        hook_url = sys.argv[3]

        logging.info(f'Received alert_file_path: {alert_file_path}, chat_id: {chat_id}, hook_url: {hook_url}')

        with open(alert_file_path, 'r') as alert_file:
            alert_json = json.load(alert_file)

        data = alert_json.get('data', {})

        alert_level = alert_json['rule']['level']
        ruleid = alert_json['rule']['id']
        description = alert_json['rule']['description']
        agentid = alert_json['agent']['id']
        agentname = alert_json['agent']['name']
        timestamp = alert_json.get('timestamp', 'Not Found')
        location = alert_json.get('location', 'Not Found')
        srcuser = alert_json.get('srcuser', 'Not Found')
        dstuser = alert_json.get('dstuser', 'Not Found')
        tty = data.get('tty', 'Not Found')
        pwd = data.get('pwd', 'Not Found')
        command = data.get('command', 'Not Found').replace('#040', ' ')
        previous_output = alert_json.get('previous_output', 'Not Found')
        full_log = alert_json.get('full_log', 'Not Found')
        previous_log = alert_json.get('previous_log', 'Not Found')
        predecoder = alert_json.get('predecoder', 'Not Found')
        manager = alert_json.get('manager', 'Not Found')
        agent = alert_json.get('agent', 'Not Found')

        agent_ip = 'Not Found'
        agent_name = 'Not Found'
        if agent != 'Not Found':
            agent_ip = agent.get('ip', 'Not Found')
            agent_name = agent.get('name', 'Not Found')


        device = 'Not Found'
        if full_log != 'Not Found':
            match = re.search('device (\w+)', full_log)
            if match:
                device = match.group(1)

        if predecoder != 'Not Found':
            predecoder = predecoder.get('program_name', 'Not Found')
        if manager != 'Not Found':
            manager = manager.get('name', 'Not Found')

        # Parses the timestamp and adds 8 hours
        timestamp_utc = parser.parse(timestamp)
        timestamp_china = timestamp_utc + timedelta(hours=8)
        timestamp_china_str = timestamp_china.strftime('%Y-%m-%d %H:%M:%S')

        # Extract the path, use 'Not Found' if 'syscheck' or 'path' is not in the JSON
        path = alert_json.get('syscheck', {}).get('path', 'Not Found')

        # Create the message text
        message_text = ""
        if path != 'Not Found':
            message_text += f"FIM警报:[{path}]\n"
        if predecoder != 'Not Found':
            message_text += f"预解码器程序名:{predecoder}\n"
        if manager != 'Not Found':
            message_text += f"管理器名:{manager}\n"
        if previous_output != 'Not Found':
            message_text += f"先前的输出:{previous_output}\n"
        if full_log != 'Not Found':
            message_text += f"完整日志:{full_log}\n"
        if previous_log != 'Not Found':
            message_text += f"先前的日志:{previous_log}\n"
        if timestamp_china_str != 'Not Found':
            message_text += f"触发时间:[{timestamp_china_str}]\n"
        if location != 'Not Found':
            message_text += f"日志来源:{location}\n"
        if description != 'Not Found':
            message_text += f"状态:{description}\n"
        if ruleid != 'Not Found':
            message_text += f"规则 ID:{ruleid}\n"
        if alert_level != 'Not Found':
            message_text += f"警报级别:{alert_level}\n"
        if agentid != 'Not Found' and agentname != 'Not Found':
            message_text += f"代理:{agentid} {agentname}\n"
        if srcuser != 'Not Found':
            message_text += f"源用户:{srcuser}\n"
        if dstuser != 'Not Found':
            message_text += f"目标用户:{dstuser}\n"
        if tty != 'Not Found':
            message_text += f"终端:{tty}\n"
        if command != 'Not Found':
            message_text += f"执行的命令:{command}\n"
        if pwd != 'Not Found':
            message_text += f"工作目录:{pwd}\n"
        if agent_ip != 'Not Found':
            message_text += f"报警主机 IP:{agent_ip}\n"
        if agent_name != 'Not Found':
            message_text += f"报警主机名称:{agent_name}\n"
        if device != 'Not Found':
            message_text += f"网络设备:{device}"

        # Generate request
        request_data = {
            'chat_id': chat_id,
            'text': message_text
        }
        response = requests.post(hook_url, data=json.dumps(request_data), headers={'Content-Type': 'application/json'})
        logging.info(f"Response status code: {response.status_code}")
        logging.info(f"Response text: {response.text}")

    except json.JSONDecodeError as e:
        logging.exception("Error decoding JSON:")
        pass
    except Exception as e:
        logging.exception("An error occurred:")
        pass

if __name__ == "__main__":
    main()

给脚本执行权限,然后可以手动测试一次脚本看看机器人是否可以发送信息,创建一个测试文件test.json

{"timestamp":"2024-05-22T03:42:40.154+0000","rule":{"level":11,"description":"网卡接口进入混杂(嗅探)模式.","id":"5104","mitre":{"id":["T1040"],"tactic":["Credential Access","Discovery"],"technique":["Network Sniffing"]},"firedtimes":3,"mail":false,"groups":["syslog","linuxkernel","promisc"],"pci_dss":["10.6.1","11.4"],"gpg13":["4.13"],"gdpr":["IV_35.7.d"],"hipaa":["164.312.b"],"nist_800_53":["AU.6","SI.4"],"tsc":["CC7.2","CC7.3","CC6.1","CC6.8"]},"agent":{"id":"085","name":"CAM-nginx","ip":"172.30.45.46"},"manager":{"name":"ip-172-30-33-46.ap-southeast-1.compute.internal"},"id":"1716349360.388101","full_log":"May 22 03:42:38 ip-111-11-11-11 kernel: device eth0 entered promiscuous mode","predecoder":{"program_name":"kernel","timestamp":"May 22 03:42:38","hostname":"ip-111-11-11-11"},"decoder":{"name":"kernel"},"location":"/var/log/messages"}

手动执行脚本

python3  /var/ossec/integrations/custom-telegram_alert.py test.json chat_id:-4179449976 https://api.telegram.org/bot7080865699:AAFMujnFpE2WU0IP25loG7wQqWBojRCMnzo/sendMessage

image.png 机器人正常发送消息 重启wazuh服务端,查看系统日志是否有报错,无异常查看报警日志,logs/alerts/alerts.log,尝试触发一些报警,看看机器人是否可以输出消息