wazuh服务端telegram-bot报警
wazuh服务telegram机器人报警脚本
ossec配置文件的
<integration>
<name>custom-telegram_alert</name>
<hook_url>机器人的api接口地址</hook_url>
<api_key>chat_id:"telegarm的频道id,以-开头"</api_key>
<level>11</level> <!-- 设置最低警报级别,低于此级别的警报将不会发送 -->
<alert_format>json</alert_format>
</integration>
然后在ossec的integrations目录下加入脚本 运行缺少依赖用pip下载即可
cat integrations/custom-telegram_alert
#!/bin/sh
# Copyright (C) 2024, Your Company.
# Created by Your Name <your.email@example.com>.
# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2
WPYTHON_BIN="framework/python/bin/python3"
SCRIPT_PATH_NAME="$0"
DIR_NAME="$(cd $(dirname ${SCRIPT_PATH_NAME}); pwd -P)"
SCRIPT_NAME="$(basename ${SCRIPT_PATH_NAME})"
case ${DIR_NAME} in
*/active-response/bin | */wodles*)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/../..; pwd)"
fi
PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
;;
*/bin)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
fi
PYTHON_SCRIPT="${WAZUH_PATH}/framework/scripts/$(echo ${SCRIPT_NAME} | sed 's/\-/_/g').py"
;;
*/integrations)
if [ -z "${WAZUH_PATH}" ]; then
WAZUH_PATH="$(cd ${DIR_NAME}/..; pwd)"
fi
PYTHON_SCRIPT="${DIR_NAME}/${SCRIPT_NAME}.py"
;;
esac
echo "$(date) - Running ${PYTHON_SCRIPT} with arguments: $@" >> /var/ossec/logs/integrations.log
${WAZUH_PATH}/${WPYTHON_BIN} ${PYTHON_SCRIPT} "$@"
cat integrations/custom-telegram_alert.py
#!/usr/bin/env python3
import sys
import json
import requests
import logging
import re
from dateutil import parser
from datetime import timedelta
def main():
logging.basicConfig(filename='/var/ossec/logs/integrations.log', level=logging.INFO)
try:
alert_file_path = sys.argv[1]
chat_id = sys.argv[2].split(':')[1]
hook_url = sys.argv[3]
logging.info(f'Received alert_file_path: {alert_file_path}, chat_id: {chat_id}, hook_url: {hook_url}')
with open(alert_file_path, 'r') as alert_file:
alert_json = json.load(alert_file)
data = alert_json.get('data', {})
alert_level = alert_json['rule']['level']
ruleid = alert_json['rule']['id']
description = alert_json['rule']['description']
agentid = alert_json['agent']['id']
agentname = alert_json['agent']['name']
timestamp = alert_json.get('timestamp', 'Not Found')
location = alert_json.get('location', 'Not Found')
srcuser = alert_json.get('srcuser', 'Not Found')
dstuser = alert_json.get('dstuser', 'Not Found')
tty = data.get('tty', 'Not Found')
pwd = data.get('pwd', 'Not Found')
command = data.get('command', 'Not Found').replace('#040', ' ')
previous_output = alert_json.get('previous_output', 'Not Found')
full_log = alert_json.get('full_log', 'Not Found')
previous_log = alert_json.get('previous_log', 'Not Found')
predecoder = alert_json.get('predecoder', 'Not Found')
manager = alert_json.get('manager', 'Not Found')
agent = alert_json.get('agent', 'Not Found')
agent_ip = 'Not Found'
agent_name = 'Not Found'
if agent != 'Not Found':
agent_ip = agent.get('ip', 'Not Found')
agent_name = agent.get('name', 'Not Found')
device = 'Not Found'
if full_log != 'Not Found':
match = re.search('device (\w+)', full_log)
if match:
device = match.group(1)
if predecoder != 'Not Found':
predecoder = predecoder.get('program_name', 'Not Found')
if manager != 'Not Found':
manager = manager.get('name', 'Not Found')
# Parses the timestamp and adds 8 hours
timestamp_utc = parser.parse(timestamp)
timestamp_china = timestamp_utc + timedelta(hours=8)
timestamp_china_str = timestamp_china.strftime('%Y-%m-%d %H:%M:%S')
# Extract the path, use 'Not Found' if 'syscheck' or 'path' is not in the JSON
path = alert_json.get('syscheck', {}).get('path', 'Not Found')
# Create the message text
message_text = ""
if path != 'Not Found':
message_text += f"FIM警报:[{path}]\n"
if predecoder != 'Not Found':
message_text += f"预解码器程序名:{predecoder}\n"
if manager != 'Not Found':
message_text += f"管理器名:{manager}\n"
if previous_output != 'Not Found':
message_text += f"先前的输出:{previous_output}\n"
if full_log != 'Not Found':
message_text += f"完整日志:{full_log}\n"
if previous_log != 'Not Found':
message_text += f"先前的日志:{previous_log}\n"
if timestamp_china_str != 'Not Found':
message_text += f"触发时间:[{timestamp_china_str}]\n"
if location != 'Not Found':
message_text += f"日志来源:{location}\n"
if description != 'Not Found':
message_text += f"状态:{description}\n"
if ruleid != 'Not Found':
message_text += f"规则 ID:{ruleid}\n"
if alert_level != 'Not Found':
message_text += f"警报级别:{alert_level}\n"
if agentid != 'Not Found' and agentname != 'Not Found':
message_text += f"代理:{agentid} {agentname}\n"
if srcuser != 'Not Found':
message_text += f"源用户:{srcuser}\n"
if dstuser != 'Not Found':
message_text += f"目标用户:{dstuser}\n"
if tty != 'Not Found':
message_text += f"终端:{tty}\n"
if command != 'Not Found':
message_text += f"执行的命令:{command}\n"
if pwd != 'Not Found':
message_text += f"工作目录:{pwd}\n"
if agent_ip != 'Not Found':
message_text += f"报警主机 IP:{agent_ip}\n"
if agent_name != 'Not Found':
message_text += f"报警主机名称:{agent_name}\n"
if device != 'Not Found':
message_text += f"网络设备:{device}"
# Generate request
request_data = {
'chat_id': chat_id,
'text': message_text
}
response = requests.post(hook_url, data=json.dumps(request_data), headers={'Content-Type': 'application/json'})
logging.info(f"Response status code: {response.status_code}")
logging.info(f"Response text: {response.text}")
except json.JSONDecodeError as e:
logging.exception("Error decoding JSON:")
pass
except Exception as e:
logging.exception("An error occurred:")
pass
if __name__ == "__main__":
main()
给脚本执行权限,然后可以手动测试一次脚本看看机器人是否可以发送信息,创建一个测试文件test.json
{"timestamp":"2024-05-22T03:42:40.154+0000","rule":{"level":11,"description":"网卡接口进入混杂(嗅探)模式.","id":"5104","mitre":{"id":["T1040"],"tactic":["Credential Access","Discovery"],"technique":["Network Sniffing"]},"firedtimes":3,"mail":false,"groups":["syslog","linuxkernel","promisc"],"pci_dss":["10.6.1","11.4"],"gpg13":["4.13"],"gdpr":["IV_35.7.d"],"hipaa":["164.312.b"],"nist_800_53":["AU.6","SI.4"],"tsc":["CC7.2","CC7.3","CC6.1","CC6.8"]},"agent":{"id":"085","name":"CAM-nginx","ip":"172.30.45.46"},"manager":{"name":"ip-172-30-33-46.ap-southeast-1.compute.internal"},"id":"1716349360.388101","full_log":"May 22 03:42:38 ip-111-11-11-11 kernel: device eth0 entered promiscuous mode","predecoder":{"program_name":"kernel","timestamp":"May 22 03:42:38","hostname":"ip-111-11-11-11"},"decoder":{"name":"kernel"},"location":"/var/log/messages"}
手动执行脚本
python3 /var/ossec/integrations/custom-telegram_alert.py test.json chat_id:-4179449976 https://api.telegram.org/bot7080865699:AAFMujnFpE2WU0IP25loG7wQqWBojRCMnzo/sendMessage
机器人正常发送消息 重启wazuh服务端,查看系统日志是否有报错,无异常查看报警日志,logs/alerts/alerts.log,尝试触发一些报警,看看机器人是否可以输出消息
评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果
音乐天地