Перейти к основному содержимому
Версия: 2.0 (WIP)

Передача событий аудита

События аудита Памир передаются через RabbitMQ и могут быть обработаны встроенным в систему Logstash.

Описание формирования событий аудита и перечень ключей маршрутизации приведены в документе Действия в системе.

Принцип работы

RabbitMQ принимает сообщения Памир и публикует их в exchange, например pamir_audit_exchange.

Logstash подключается к RabbitMQ, создает или использует очереди для аудита, извлекает нужные события по ключу маршрутизации, фильтрует их по роли пользователя и по названию прикладного объекта, после чего:

  • передает события во встроенный OpenSearch;
  • при необходимости передает события во внешние системы.

Схема передачи событий:

Использование Logstash

Logstash входит в состав дистрибутива Памир и используется для получения событий из RabbitMQ, их фильтрации и последующей передачи:

  • во встроенный OpenSearch;
  • во внешние системы.

Для добавления собственных pipeline необходимо в файл docker-compose.additional.yml добавить дополнительные volume для файлов конфигурации pipeline и файла pipelines.yml.

Пример:

  logstash:
env_file:
- .opensearch.env
volumes:
- /home/pamir/deploy/data/logstash-core/pipeline/pamir-audit.conf:/usr/share/logstash/pipeline/pamir-audit.conf
- /home/pamir/deploy/data/logstash-core/pipeline/fesb-trace.conf:/usr/share/logstash/pipeline/fesb-trace.conf
- /home/pamir/deploy/data/logstash-core/pipeline/pamir-user-activity.conf:/usr/share/logstash/pipeline/pamir-user-activity.conf
- /home/pamir/deploy/data/logstash-core/pipelines.yml:/usr/share/logstash/config/pipelines.yml

Параметры подключения для pipeline Logstash задаются в файле окружения, подключённом через env_file.

Пример переменных окружения:

RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=vhost

OPENSEARCH_LOGSTASH_PASSWORD=pswd

Настройка pipeline Logstash

Конфигурацию pipeline необходимо записать в отдельный файл, например pamir-audit.conf, и подключить его через volumes и pipelines.yml.

Пример конфигурации:

input {
rabbitmq {
host => "${RABBITMQ_HOST:rabbitmq}"
port => "${RABBITMQ_PORT:5672}"
user => "${RABBITMQ_USER}"
password => "${RABBITMQ_PASSWORD}"
vhost => "${RABBITMQ_VHOST:vhost}"

# События {vars.name} в exchange RabbitMQ типа topic
exchange => "pamir_audit_exchange"
exchange_type => "topic"
durable => true

# Routing key для фильтрации сообщений.
# Перечень событий и ключей маршрутизации приведён в разделе Действия в системе.
key => "#"

# Logstash создаёт очередь автоматически, если она отсутствует.
queue => "logstash-audit-queue"

auto_delete => false
exclusive => false
durable => true

ack => true
prefetch_count => 200
passive => false

metadata_enabled => "none"

codec => json { ecs_compatibility => "disabled" }
}
}

filter {
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}

mutate {
remove_field => ["timestamp"]
}

# Разделение событий по ролям пользователей.
if [user_role] in ["pamir_root", "pamir_admin", "pamir_admin_ib"] {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-admin-logs" }
}
} else {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-user-logs" }
}
}
}

output {
# --- OpenSearch ---
# Передача событий аудита в OpenSearch для централизованного хранения, поиска и последующего анализа.
opensearch {
hosts => ["https://opensearch:9200"]
auth_type => {
type => "basic"
user => "pamir-logstash-user"
password => "${OPENSEARCH_LOGSTASH_PASSWORD}"
}
ssl_certificate_verification => false
index => "%{[@metadata][index_prefix]}-%{+YYYY.MM.dd}"
manage_template => false
}

# --- SYSLOG ---
# Передача событий аудита на внешний сервер сбора и анализа
syslog {
id => "audit_syslog_tls"
host => "${SYSLOG_HOST:syslog.example.local}"
port => "${SYSLOG_PORT:6514}"
protocol => "ssl-tcp"
rfc => "rfc5424"
appname => "pamir-audit"
facility => "local0"
severity => "informational"
sourcehost => "%{[host]}"
message => '{"event_type":"%{[event_type]}","user_id":"%{[user_id]}","user_login":"%{[user_login]}","user_role":"%{[user_role]}","endpoint":"%{[endpoint]}","parameters":"%{[parameters]}","response_status_code":"%{[response_status_code]}","@timestamp":"%{[@timestamp]}"}'
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/syslog-ca.crt"]
ssl_verify => true
}

# --- REST API ---
# Передача событий аудита во внешний сервис по REST API с использованием HTTPS.
http {
id => "audit_rest_https"
url => "https://audit-collector.example.local/api/v1/audit"
http_method => "post"
format => "json"
content_type => "application/json"
headers => {
"Authorization" => "Bearer ${AUDIT_API_TOKEN}"
"X-Source-System" => "pamir"
}
automatic_retries => 2
retry_failed => true
ssl_enabled => true
ssl_verification_mode => "full"
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/api-ca.crt"]
}

# --- JDBC ---
# Передача событий аудита во внешнюю реляционную базу данных по JDBC.

jdbc {
driver_class => "org.postgresql.Driver"
connection_string => "jdbc:postgresql://postgres:5432/auditdb?ssl=true&sslmode=verify-full"
username => "${JDBC_USER}"
password => "${JDBC_PASSWORD}"
statement => [
"INSERT INTO audit_events (event_time, event_type, user_id, user_login, user_role, endpoint, parameters, response_status_code, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
"@timestamp",
"event_type",
"user_id",
"user_login",
"user_role",
"endpoint",
"parameters",
"response_status_code",
"payload"
]
}
}

В приведённой конфигурации:

  • блок input.rabbitmq подключает Logstash к RabbitMQ и получает события из exchange pamir_audit_exchange через очередь logstash-audit-queue;
  • параметр key задаёт перечень ключей маршрутизации, по которым будут отбираться события аудита;
  • блок filter.date преобразует время события в поле @timestamp;
  • блок filter.mutate удаляет исходное поле timestamp после преобразования;
  • блок filter с проверкой user_role разделяет события административных пользователей и остальных пользователей по разным индексам OpenSearch;
  • блок output.opensearch передаёт события во встроенный OpenSearch;
  • дополнительные блоки output могут использоваться для передачи событий во внешние системы.

Таким образом конфигурация позволяет получать события аудита из RabbitMQ, фильтровать их по ключу маршрутизации, роли пользователя и названию прикладного объекта, а затем передавать их в OpenSearch или во внешние системы.

Настройка перечня объектов доступа

Настройка перечня объектов доступа, для которых регистрируются попытки обращения, выполняется на двух уровнях:

  1. на уровне подписки на события RabbitMQ по routing key;
  2. на уровне фильтрации событий в Logstash.

Таким образом обеспечивается возможность настройки перечня объектов доступа:

  • по сервису;
  • по классу прикладного объекта;
  • по роли пользователя;
  • по имени конкретного прикладного объекта;
  • по параметрам пути запроса.

Фильтрация по ключу маршрутизации

Фильтрация по routing key задаёт перечень событий, которые Logstash получает из RabbitMQ.

Например:

  • # - все события;
  • srm.# - все события сервиса СРМ;
  • srm.user_action.dashboard.* - все события, связанные с дашбордами;
  • srm.user_action.ci_type.* - все события, связанные с типами CI;
  • srm.user_action.ci_type.CITypeCreated - конкретное событие, создание типа CI.

Таким образом на уровне обращения к RabbitMQ можно задать перечень прикладных объектов, для которых будут регистрироваться попытки обращения.

Фильтрация по роли пользователя

Фильтрация по ролям выполняется в блоке filter конфигурации Logstash.

Пример:

if [user_role] in ["pamir_root", "pamir_admin", "pamir_admin_ib"] {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-admin-logs" }
}
} else {
mutate {
add_field => { "[@metadata][index_prefix]" => "pamir-sec-crud-user-logs" }
}
}

В этом примере:

  • события пользователей с ролями pamir_root, pamir_admin, pamir_admin_ib помещаются в индекс административных действий;
  • события остальных пользователей помещаются в индекс пользовательских действий.

При необходимости можно регистрировать события только для отдельных ролей:

if [user_role] not in ["pamir_root", "pamir_admin", "pamir_admin_ib"] {
drop {}
}

Фильтрация по названию прикладного объекта

Фильтрация по названию прикладного объекта выполняется после разбора поля payload.

Для событий:

  • srm.user_action.ci_type.* имя объекта, как правило, находится в payload_json.name;
  • srm.user_action.dashboard.* имя объекта может находиться в payload_json.title или payload_json.name.

Пример разбора поля payload:

json {
source => "payload"
target => "payload_json"
skip_on_invalid_json => true
}

После этого можно использовать фильтрацию по названию объекта.

Пример фильтрации по названию типа CI:

if [event_type] =~ /^srm\.user_action\.ci_type\./ and [payload_json][name] != "audit_log" {
drop {}
}

Пример фильтрации по названию дашборда:

if [event_type] =~ /^srm\.user_action\.dashboard\./ and [payload_json][title] != "audit" {
drop {}
}

Таким образом можно регистрировать попытки обращения только к прикладным объектам с заданным именем.

Фильтрация по параметрам пути запроса

Фильтрация по параметрам пути запроса может использоваться в случаях, когда имя или идентификатор прикладного объекта передаётся в path_parameters.

Состав полей path_parameters зависит от конкретного endpoint.

Такой способ фильтрации может использоваться, например, при регистрации событий изменения дашбордов.

Пример события:

  • event_type: srm.user_action.dashboard.DashboardUpdated;
  • endpoint: /v4/dashboard/{dashboard_name};
  • path_parameters.dashboard_name: audit.

Пример фильтрации по имени дашборда:

if [event_type] =~ /^srm\.user_action\.dashboard\./ and [path_parameters][dashboard_name] != "audit" {
drop {}
}

Таким образом можно регистрировать попытки обращения только к прикладным объектам, параметры которых передаются в пути запроса.

Передача событий во встроенный OpenSearch

OpenSearch входит в состав дистрибутива Памир и используется системой для хранения и поиска событий аудита.

В приведённой конфигурации Logstash:

  • административные события записываются в индексы вида pamir-sec-crud-admin-logs-YYYY.MM.dd;
  • пользовательские события записываются в индексы вида pamir-sec-crud-user-logs-YYYY.MM.dd.

Разделение по индексам выполняется на основании поля user_role.

Такой подход позволяет:

  • отделить административные действия от пользовательских;
  • упростить поиск событий;
  • упростить настройку хранения данных.

Передача событий во внешние системы

Для передачи событий аудита во внешние системы в конфигурации Logstash предусмотрены отдельные блоки output.

Для включения интеграции необходимо:

  1. Добавить нужный блок output;
  2. задать параметры подключения в файле окружения, подключённом через env_file;
  3. при необходимости добавить сертификаты и иные файлы конфигурации.

Передача по Syslog

Передача событий аудита на внешний сервер сбора и анализа событий безопасности может быть выполнена по протоколу syslog.

В примере конфигурации используется защищённый вариант TLS/TCP.

Передача по REST API

Передача событий аудита во внешний сервис может быть выполнена по REST API.

В примере конфигурации используется протокол HTTPS с проверкой сертификата сервера.

Передача по JDBC

Передача событий аудита во внешнюю базу данных может быть выполнена по JDBC.

Данный вариант требует отдельной настройки драйвера и должен использоваться только при наличии соответствующей инфраструктуры.

Передача событий в локальный файл

Для сохранения журналов ИБ на локальный диск в Logstash может использоваться плагин file.

Подключение каталога для журналов

Для записи журналов в локальные файлы необходимо добавить в docker-compose.additional.yml отдельный volume с каталогом для хранения логов.

Пример:

  logstash:
volumes:
- /home/pamir/deploy/data/logstash-core/logs:/var/log/logstash

В приведённом примере:

  • каталог /home/pamir/deploy/data/logstash-core/logs расположен на узле с Docker;
  • каталог /var/log/logstash доступен внутри контейнера logstash;
  • файлы журналов записываются в смонтированный каталог на узле.

Для отдельных видов журналов рекомендуется использовать отдельные подкаталоги, например pamir-sec-user-activity.

Настройка output

Для записи событий в локальный файл необходимо добавить в pipeline блок output.

Пример:

output {
file {
path => "/var/log/logstash/pamir-sec-user-activity/pamir-sec-user-activity-%{+YYYY-MM-dd}.log"
codec => json_lines
flush_interval => 1
create_if_deleted => true
}
}

В приведённой конфигурации:

  • path задаёт путь к файлу журналов;
  • %{+YYYY-MM-dd} в имени файла обеспечивает разбиение файлов по дате события;
  • codec => json_lines задаёт запись каждого события отдельной строкой в формате JSON;
  • flush_interval => 1 задаёт интервал сброса буфера на диск;
  • create_if_deleted => true указывает, что при удалении файла он будет создан повторно при поступлении следующего события.

Результат работы

При такой настройке Logstash записывает события в локальные файлы вида:

  • pamir-sec-user-activity-YYYY-MM-dd.log.

Таким образом обеспечивается разбиение журналов по дням.

Особенности использования

Данная настройка обеспечивает разбиение файлов по дате, но не включает полноценную ротацию и политику хранения файлов средствами операционной системы.

Это означает, что:

  • старые файлы не удаляются автоматически;
  • сжатие файлов не выполняется автоматически;
  • ротация по размеру файла не выполняется автоматически.

При необходимости удаление старых файлов, сжатие и ротация по размеру должны быть дополнительно настроены средствами операционной системы.

Альтернативная передача событий с помощью vector

Для передачи событий аудита также может использоваться утилита vector.

Подключение утилиты vector

Для подключения утилиты vector в виде сервиса, необходимо в файл docker-compose.additional.yml добавить следующие строки:

  vector: 
image: timberio/vector:latest-alpine
networks: [pamir-net]
profiles: [all, additional]
restart: unless-stopped
env_file:
- .env
volumes:
- type: bind
source: ./data/vector
target: /etc/vector
command: ["--config", "/etc/vector/vector.toml"]

Настройка утилиты vector

Конфигурацию сервиса vector нужно записать в файл vector.toml. Путь к файлу vector.toml необходимо указать в файле docker-compose.additional.yml, в volumes.

Пример конфигурации:

[sources.rabbitmq]
type = "amqp"
connection_string = "amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@${RABBITMQ_HOST}:5672/${RABBITMQ_VHOST}"
queue = "vector"
consumer = "vector"

[transforms.parse_json]
type = "remap"
inputs = ["rabbitmq"]
source = '''
parsed, err = parse_json(.message)

. = {
"event": .routing,
"timestamp": parsed.timestamp,
"user_id": parsed.user_id,
"user_login": parsed.user_login,
"endpoint": parsed.endpoint,
"parameters": parsed.parameters,
"path_parameters": parsed.path_parameters,
"metadata": parsed.metadata,
"response_status_code": parsed.response_status_code,
}

'''

[sinks.file]
type = "file"
inputs = ["parse_json"]
path = "/etc/vector/log/audit-%Y-%m-%d.log"
encoding.codec = "json"

[sinks.console]
type = "console"
inputs = ["parse_json"]
encoding.codec = "json"

Значения переменным RABBITMQ_USER, RABBITMQ_PASSWORD, RABBITMQ_HOST и RABBITMQ_VHOST присваиваются в файле .env.

Настройка RabbitMQ для vector

В RabbitMQ необходимо создать очередь vector. В веб интерфейсе RabbitMQ сделать это можно следующим образом: на вкладке Queues выберите Add a new queue и задайте следующие параметры:

  • Type: Classic;
  • Name: vector;
  • Durability: Durable;
  • Auto delete: No.

Выберите Add queue, будет создана очередь vector.

После создания очереди необходимо настроить маршрутизацию сообщений. Для этого на вкладке Queues, выберите очередь vector, на открывшейся странице Queue vector выберите Bindings и задайте следующие параметры:

  • From exchange: pamir_audit_exchange;
  • Routing key: #.

Выберите Bind, маршрутизация настроена.

Routing key (Ключ маршрутизации) - это строка, которая поддерживает символы подстановки:

  • * - заменяет одно слово;
  • # - заменяет несколько слов или их отсутствие.

Перечень ключей маршрутизации приведён в документе Действия в системе.

Принцип работы vector

RabbitMQ принимает сообщения Памир. vector подключается к RabbitMQ, читает сообщения из очереди vector, разбирает их согласно transforms.parse_json и сохраняет их в файл ./data/vector/log/audit-%Y-%m-%d.log в формате JSON.