Передача событий аудита
События аудита Памир передаются через RabbitMQ и могут быть обработаны встроенным в систему Logstash.
Описание формирования событий аудита и перечень ключей маршрутизации приведены в документе Действия в системе.
Принцип работы
RabbitMQ принимает сообщения Памир и публикует их в exchange, например pamir_audit_exchange.
Logstash подключается к RabbitMQ, создает или использует очереди для аудита, извлекает нужные события по ключу маршрутизации, фильтрует их по роли пользователя и по названию прикладного объекта, после чего:
- передает события во встроенный OpenSearch;
- при необходимости передает события во внешние системы.
Схема передачи событий:
Использование Logstash
Logstash входит в состав дистрибутива Памир и используется для получения событий из RabbitMQ, их фильтрации и последующей передачи:
- во встроенный OpenSearch;
- во внешние системы.
Для добавления собственных pipeline необходимо в файл docker-compose.additional.yml добавить дополнительные volume для файлов конфигурации pipeline и файла pipelines.yml.
Пример:
logstash:
env_file:
- .opensearch.env
volumes:
- /home/pamir/deploy/data/logstash-core/pipeline/pamir-audit.conf:/usr/share/logstash/pipeline/pamir-audit.conf
- /home/pamir/deploy/data/logstash-core/pipeline/fesb-trace.conf:/usr/share/logstash/pipeline/fesb-trace.conf
- /home/pamir/deploy/data/logstash-core/pipeline/pamir-user-activity.conf:/usr/share/logstash/pipeline/pamir-user-activity.conf
- /home/pamir/deploy/data/logstash-core/pipelines.yml:/usr/share/logstash/config/pipelines.yml
Параметры подключения для pipeline Logstash задаются в файле окружения, подключённом через env_file.
Пример переменных окружения:
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=vhost
OPENSEARCH_LOGSTASH_PASSWORD=pswd
Настройка pipeline Logstash
Конфигурацию pipeline необходимо записать в отдельный файл, например pamir-audit.conf, и подключить его через volumes и pipelines.yml.
Пример конфигурации:
input {
rabbitmq {
host => "${RABBITMQ_HOST:rabbitmq}"
port => "${RABBITMQ_PORT:5672}"
user => "${RABBITMQ_USER}"
password => "${RABBITMQ_PASSWORD}"
vhost => "${RABBITMQ_VHOST:vhost}"
# События {vars.name} в exchange RabbitMQ типа topic
exchange => "pamir_audit_exchange"
exchange_type => "topic"
durable => true
# Routing key для фильтрации сообщений.
# Перечень событий и ключей маршрутизации приведён в разделе Действия в системе.
key => "#"
# Logstash создаёт очередь автоматически, если она отсутствует.
queue => "logstash-audit-queue"
auto_delete => false
exclusive => false
durable => true
ack => true
prefetch_count => 200
passive => false
metadata_enabled => "none"
codec => json { ecs_compatibility => "disabled" }
}
}
filter {
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
mutate {
remove_field => ["timestamp"]
}
# Разделение событий по ролям пользователей.
if [user_role] in ["pamir_root", "pamir_admin", "pamir_admin_ib"] {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-admin-logs" }
}
} else {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-user-logs" }
}
}
}
output {
# --- OpenSearch ---
# Передача событий аудита в OpenSearch для централизованного хранения, поиска и последующего анализа.
opensearch {
hosts => ["https://opensearch:9200"]
auth_type => {
type => "basic"
user => "pamir-logstash-user"
password => "${OPENSEARCH_LOGSTASH_PASSWORD}"
}
ssl_certificate_verification => false
index => "%{[@metadata][index_prefix]}-%{+YYYY.MM.dd}"
manage_template => false
}
# --- SYSLOG ---
# Передача событий аудита на внешний сервер сбора и анализа
syslog {
id => "audit_syslog_tls"
host => "${SYSLOG_HOST:syslog.example.local}"
port => "${SYSLOG_PORT:6514}"
protocol => "ssl-tcp"
rfc => "rfc5424"
appname => "pamir-audit"
facility => "local0"
severity => "informational"
sourcehost => "%{[host]}"
message => '{"event_type":"%{[event_type]}","user_id":"%{[user_id]}","user_login":"%{[user_login]}","user_role":"%{[user_role]}","endpoint":"%{[endpoint]}","parameters":"%{[parameters]}","response_status_code":"%{[response_status_code]}","@timestamp":"%{[@timestamp]}"}'
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/syslog-ca.crt"]
ssl_verify => true
}
# --- REST API ---
# Передача событий аудита во внешний сервис по REST API с использованием HTTPS.
http {
id => "audit_rest_https"
url => "https://audit-collector.example.local/api/v1/audit"
http_method => "post"
format => "json"
content_type => "application/json"
headers => {
"Authorization" => "Bearer ${AUDIT_API_TOKEN}"
"X-Source-System" => "pamir"
}
automatic_retries => 2
retry_failed => true
ssl_enabled => true
ssl_verification_mode => "full"
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/api-ca.crt"]
}
# --- JDBC ---
# Передача событий аудита во внешнюю реляционную базу данных по JDBC.
jdbc {
driver_class => "org.postgresql.Driver"
connection_string => "jdbc:postgresql://postgres:5432/auditdb?ssl=true&sslmode=verify-full"
username => "${JDBC_USER}"
password => "${JDBC_PASSWORD}"
statement => [
"INSERT INTO audit_events (event_time, event_type, user_id, user_login, user_role, endpoint, parameters, response_status_code, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
"@timestamp",
"event_type",
"user_id",
"user_login",
"user_role",
"endpoint",
"parameters",
"response_status_code",
"payload"
]
}
}
В приведённой конфигурации:
- блок
input.rabbitmqподключаетLogstashк RabbitMQ и получает события из exchangepamir_audit_exchangeчерез очередьlogstash-audit-queue; - параметр
keyзадаёт перечень ключей маршрутизации, по которым будут отбираться события аудита; - блок
filter.dateпреобразует время события в поле@timestamp; - блок
filter.mutateудаляет исходное полеtimestampпосле преобразования; - блок
filterс проверкойuser_roleразделяет события административных пользователей и остальных пользователей по разным индексам OpenSearch; - блок
output.opensearchпередаёт события во встроенный OpenSearch; - дополнительные блоки
outputмогут использоваться для передачи событий во внешние системы.
Таким образом конфигурация позволяет получать события аудита из RabbitMQ, фильтровать их по ключу маршрутизации, роли пользователя и названию прикладного объекта, а затем передавать их в OpenSearch или во внешние системы.
Настройка перечня объектов доступа
Настройка перечня объектов доступа, для которых регистрируются попытки обращения, выполняется на двух уровнях:
- на уровне подписки на события RabbitMQ по
routing key; - на уровне фильтрации событий в
Logstash.
Таким образом обеспечивается возможность настройки перечня объектов доступа:
- по сервису;
- по классу прикладного объекта;
- по роли пользователя;
- по имени конкретного прикладного объекта;
- по параметрам пути запроса.
Фильтрация по ключу маршрутизации
Фильтрация по routing key задаёт перечень событий, которые Logstash получает из RabbitMQ.
Например:
#- все события;srm.#- все события сервиса СРМ;srm.user_action.dashboard.*- все события, связанные с дашбордами;srm.user_action.ci_type.*- все события, связанные с типами CI;srm.user_action.ci_type.CITypeCreated- конкретное событие, создание типа CI.
Таким образом на уровне обращения к RabbitMQ можно задать перечень прикладных объектов, для которых будут регистрироваться попытки обращения.
Фильтрация по роли пользователя
Фильтрация по ролям выполняется в блоке filter конфигурации Logstash.
Пример:
if [user_role] in ["pamir_root", "pamir_admin", "pamir_admin_ib"] {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-admin-logs" }
}
} else {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-user-logs" }
}
}
В этом примере:
- события пользователей с ролями
pamir_root,pamir_admin,pamir_admin_ibпомещаются в индекс административных действий; - события остальных пользователей помещаются в индекс пользовательских действий.
При необходимости можно регистрировать события только для отдельных ролей:
if [user_role] not in ["pamir_root", "pamir_admin", "pamir_admin_ib"] {
drop {}
}
Фильтрация по названию прикладного объекта
Фильтрация по названию прикладного объекта выполняется после разбора поля payload.
Для событий:
srm.user_action.ci_type.*имя объекта, как правило, находится вpayload_json.name;srm.user_action.dashboard.*имя объекта может находиться вpayload_json.titleилиpayload_json.name.
Пример разбора поля payload:
json {
source => "payload"
target => "payload_json"
skip_on_invalid_json => true
}
После этого можно использовать фильтрацию по названию объекта.
Пример фильтрации по названию типа CI:
if [event_type] =~ /^srm\.user_action\.ci_type\./ and [payload_json][name] != "audit_log" {
drop {}
}
Пример фильтрации по названию дашборда:
if [event_type] =~ /^srm\.user_action\.dashboard\./ and [payload_json][title] != "audit" {
drop {}
}
Таким образом можно регистрировать попытки обращения только к прикладным объектам с заданным именем.
Фильтрация по параметрам пути запроса
Фильтрация по параметрам пути запроса может использоваться в случаях, когда имя или идентификатор прикладного объекта передаётся в path_parameters.
Состав полей path_parameters зависит от конкретного endpoint.
Такой способ фильтрации может использоваться, например, при регистрации событий изменения дашбордов.
Пример события:
event_type:srm.user_action.dashboard.DashboardUpdated;endpoint:/v4/dashboard/{dashboard_name};path_parameters.dashboard_name:audit.
Пример фильтрации по имени дашборда:
if [event_type] =~ /^srm\.user_action\.dashboard\./ and [path_parameters][dashboard_name] != "audit" {
drop {}
}
Таким образом можно регистрировать попытки обращения только к прикладным объектам, параметры которых передаются в пути запроса.
Передача событий во встроенный OpenSearch
OpenSearch входит в состав дистрибутива Памир и используется системой для хранения и поиска событий аудита.
В приведённой конфигурации Logstash:
- административные события записываются в индексы вида
pamir-sec-crud-admin-logs-YYYY.MM.dd; - пользовательские события записываются в индексы вида
pamir-sec-crud-user-logs-YYYY.MM.dd.
Разделение по индексам выполняется на основании поля user_role.
Такой подход позволяет:
- отделить административные действия от пользовательских;
- упростить поиск событий;
- упростить настройку хранения данных.
Передача событий во внешние системы
Для передачи событий аудита во внешние системы в конфигурации Logstash предусмотрены отдельные блоки output.
Для включения интеграции необходимо:
- Добавить нужный блок
output; - задать параметры подключения в файле окружения, подключённом через
env_file; - при необходимости добавить сертификаты и иные файлы конфигурации.
Передача по Syslog
Передача событий аудита на внешний сервер сбора и анализа событий безопасности может быть выполнена по протоколу syslog.
В примере конфигурации используется защищённый вариант TLS/TCP.
Передача по REST API
Передача событий аудита во внешний сервис может быть выполнена по REST API.
В примере конфигурации используется протокол HTTPS с проверкой сертификата сервера.
Передача по JDBC
Передача событий аудита во внешнюю базу данных может быть выполнена по JDBC.
Данный вариант требует отдельной настройки драйвера и должен использоваться только при наличии соответствующей инфраструктуры.
Передача событий в локальный файл
Для сохранения журналов ИБ на локальный диск в Logstash может использоваться плагин file.
Подключение каталога для журналов
Для записи журналов в локальные файлы необходимо добавить в docker-compose.additional.yml отдельный volume с каталогом для хранения логов.
Пример:
logstash:
volumes:
- /home/pamir/deploy/data/logstash-core/logs:/var/log/logstash
В приведённом примере:
- каталог
/home/pamir/deploy/data/logstash-core/logsрасположен на узле с Docker; - каталог
/var/log/logstashдоступен внутри контейнераlogstash; - файлы журналов записываются в смонтированный каталог на узле.
Для отдельных видов журналов рекомендуется использовать отдельные подкаталоги, например pamir-sec-user-activity.
Настройка output
Для записи событий в локальный файл необходимо добавить в pipeline блок output.
Пример:
output {
file {
path => "/var/log/logstash/pamir-sec-user-activity/pamir-sec-user-activity-%{+YYYY-MM-dd}.log"
codec => json_lines
flush_interval => 1
create_if_deleted => true
}
}
В приведённой конфигурации:
pathзадаёт путь к файлу журналов;%{+YYYY-MM-dd}в имени файла обеспечивает разбиение файлов по дате события;codec => json_linesзадаёт запись каждого события отдельной строкой в формате JSON;flush_interval => 1задаёт интервал сброса буфера на диск;create_if_deleted => trueуказывает, что при удалении файла он будет создан повторно при поступлении следующего события.
Результат работы
При такой настройке Logstash записывает события в локальные файлы вида:
pamir-sec-user-activity-YYYY-MM-dd.log.
Таким образом обеспечивается разбиение журналов по дням.
Особенности использования
Данная настройка обеспечивает разбиение файлов по дате, но не включает полноценную ротацию и политику хранения файлов средствами операционной системы.
Это означает, что:
- старые файлы не удаляются автоматически;
- сжатие файлов не выполняется автоматически;
- ротация по размеру файла не выполняется автоматически.
При необходимости удаление старых файлов, сжатие и ротация по размеру должны быть дополнительно настроены средствами операционной системы.
Альтернативная передача событий с помощью vector
Для передачи событий аудита также может использоваться утилита vector.
Подключение утилиты vector
Для подключения утилиты vector в виде сервиса, необходимо в файл docker-compose.additional.yml добавить следующие строки:
vector:
image: timberio/vector:latest-alpine
networks: [pamir-net]
profiles: [all, additional]
restart: unless-stopped
env_file:
- .env
volumes:
- type: bind
source: ./data/vector
target: /etc/vector
command: ["--config", "/etc/vector/vector.toml"]
Настройка утилиты vector
Конфигурацию сервиса vector нужно записать в файл vector.toml. Путь к файлу vector.toml необходимо указать в файле docker-compose.additional.yml, в volumes.
Пример конфигурации:
[sources.rabbitmq]
type = "amqp"
connection_string = "amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@${RABBITMQ_HOST}:5672/${RABBITMQ_VHOST}"
queue = "vector"
consumer = "vector"
[transforms.parse_json]
type = "remap"
inputs = ["rabbitmq"]
source = '''
parsed, err = parse_json(.message)
. = {
"event": .routing,
"timestamp": parsed.timestamp,
"user_id": parsed.user_id,
"user_login": parsed.user_login,
"endpoint": parsed.endpoint,
"parameters": parsed.parameters,
"path_parameters": parsed.path_parameters,
"metadata": parsed.metadata,
"response_status_code": parsed.response_status_code,
}
'''
[sinks.file]
type = "file"
inputs = ["parse_json"]
path = "/etc/vector/log/audit-%Y-%m-%d.log"
encoding.codec = "json"
[sinks.console]
type = "console"
inputs = ["parse_json"]
encoding.codec = "json"
Значения переменным RABBITMQ_USER, RABBITMQ_PASSWORD, RABBITMQ_HOST и RABBITMQ_VHOST присваиваются в файле .env.
Настройка RabbitMQ для vector
В RabbitMQ необходимо создать очередь vector. В веб интерфейсе RabbitMQ сделать это можно следующим образом: на вкладке Queues выберите Add a new queue и задайте следующие параметры:
- Type: Classic;
- Name:
vector; - Durability: Durable;
- Auto delete: No.
Выберите Add queue, будет создана очередь vector.
После создания очереди необходимо настроить маршрутизацию сообщений. Для этого на вкладке Queues, выберите очередь vector, на открывшейся странице Queue vector выберите Bindings и задайте следующие параметры:
- From exchange:
pamir_audit_exchange; - Routing key:
#.
Выберите Bind, маршрутизация настроена.
Routing key (Ключ маршрутизации) - это строка, которая поддерживает символы подстановки:
*- заменяет одно слово;#- заменяет несколько слов или их отсутствие.
Перечень ключей маршрутизации приведён в документе Действия в системе.
Принцип работы vector
RabbitMQ принимает сообщения Памир. vector подключается к RabbitMQ, читает сообщения из очереди vector, разбирает их согласно transforms.parse_json и сохраняет их в файл ./data/vector/log/audit-%Y-%m-%d.log в формате JSON.