-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Engine - Development of PoC for Correlation Rules using OpenSearch SQL Plugin #23510
Comments
UpdateFollowing the second approach explained here (detailed algorithm will be posted when refined) a PoC in python is ready with basic metrics to measure network and indexer queries times. Detailed indexer impact (such as CPU, memory ...) will need to be measure aswell using the Metrics or the profile endpoints. References:
Example output of current implementation: ****************** Initial Query ******************
Timestamp: 1970-01-01T00:00:00
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 3 | 2024-05-08T01:20:09.000Z | 10.0.0.1 |
| 7 | 2024-05-08T01:20:21.000Z | 10.0.0.2 |
| 12 | 2024-05-08T01:20:21.000Z | 10.0.0.3 |
| 15 | 2024-05-08T01:21:04.000Z | 10.0.0.4 |
| 18 | 2024-05-08T01:21:09.000Z | 10.0.0.4 |
| 25 | 2024-05-08T01:22:06.000Z | 10.0.0.5 |
| 26 | 2024-05-08T01:22:07.000Z | 10.0.0.5 |
+----+--------------------------+----------+
New Timestamp: 2024-05-08T01:22:07
************************************
****************** Sequences ******************
Dynamic: 10.0.0.1
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 3 | 2024-05-08T01:20:09.000Z | 10.0.0.1 |
+----+--------------------------+----------+
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 3 | 2024-05-08T01:20:09.000Z | 10.0.0.1 |
+----+--------------------------+----------+
Upper bound: 2024-05-08T01:20:09
Lower bound: 2024-05-08T01:19:49
Query for step 0: SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.1") AND event.ingested > '2024-05-08T01:19:49' AND event.ingested < '2024-05-08T01:20:09' ORDER BY event.ingested ASC;
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 0 | 2024-05-08T01:20:00.000Z | 10.0.0.1 |
| 1 | 2024-05-08T01:20:03.000Z | 10.0.0.1 |
| 2 | 2024-05-08T01:20:06.000Z | 10.0.0.1 |
+----+--------------------------+----------+
Sequence found!
+----+
| id |
+----+
| 3 |
| 0 |
| 1 |
| 2 |
+----+
Dynamic: 10.0.0.2
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 7 | 2024-05-08T01:20:21.000Z | 10.0.0.2 |
+----+--------------------------+----------+
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 7 | 2024-05-08T01:20:21.000Z | 10.0.0.2 |
+----+--------------------------+----------+
Upper bound: 2024-05-08T01:20:21
Lower bound: 2024-05-08T01:20:01
Query for step 0: SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.2") AND event.ingested > '2024-05-08T01:20:01' AND event.ingested < '2024-05-08T01:20:21' ORDER BY event.ingested ASC;
Not enough events for step 0
Dynamic: 10.0.0.3
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 12 | 2024-05-08T01:20:21.000Z | 10.0.0.3 |
+----+--------------------------+----------+
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 12 | 2024-05-08T01:20:21.000Z | 10.0.0.3 |
+----+--------------------------+----------+
Upper bound: 2024-05-08T01:20:21
Lower bound: 2024-05-08T01:20:01
Query for step 0: SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.3") AND event.ingested > '2024-05-08T01:20:01' AND event.ingested < '2024-05-08T01:20:21' ORDER BY event.ingested ASC;
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 9 | 2024-05-08T01:20:13.000Z | 10.0.0.3 |
| 10 | 2024-05-08T01:20:16.000Z | 10.0.0.3 |
| 11 | 2024-05-08T01:20:16.000Z | 10.0.0.3 |
+----+--------------------------+----------+
Sequence found!
+----+
| id |
+----+
| 12 |
| 9 |
| 10 |
| 11 |
+----+
Dynamic: 10.0.0.4
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 15 | 2024-05-08T01:21:04.000Z | 10.0.0.4 |
| 18 | 2024-05-08T01:21:09.000Z | 10.0.0.4 |
+----+--------------------------+----------+
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 15 | 2024-05-08T01:21:04.000Z | 10.0.0.4 |
+----+--------------------------+----------+
Upper bound: 2024-05-08T01:21:04
Lower bound: 2024-05-08T01:20:44
Query for step 0: SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.4") AND event.ingested > '2024-05-08T01:20:44' AND event.ingested < '2024-05-08T01:21:04' ORDER BY event.ingested ASC;
Not enough events for step 0
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 18 | 2024-05-08T01:21:09.000Z | 10.0.0.4 |
+----+--------------------------+----------+
Upper bound: 2024-05-08T01:21:09
Lower bound: 2024-05-08T01:20:44
Query for step 0: SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.4") AND event.ingested > '2024-05-08T01:20:44' AND event.ingested < '2024-05-08T01:21:09' ORDER BY event.ingested ASC;
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 13 | 2024-05-08T01:21:00.000Z | 10.0.0.4 |
| 14 | 2024-05-08T01:21:03.000Z | 10.0.0.4 |
| 16 | 2024-05-08T01:21:05.000Z | 10.0.0.4 |
+----+--------------------------+----------+
Sequence found!
+----+
| id |
+----+
| 18 |
| 13 |
| 14 |
| 16 |
+----+
Dynamic: 10.0.0.5
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 25 | 2024-05-08T01:22:06.000Z | 10.0.0.5 |
| 26 | 2024-05-08T01:22:07.000Z | 10.0.0.5 |
+----+--------------------------+----------+
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 25 | 2024-05-08T01:22:06.000Z | 10.0.0.5 |
+----+--------------------------+----------+
Upper bound: 2024-05-08T01:22:06
Lower bound: 2024-05-08T01:21:46
Query for step 0: SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.5") AND event.ingested > '2024-05-08T01:21:46' AND event.ingested < '2024-05-08T01:22:06' ORDER BY event.ingested ASC;
----- Group -----
+----+--------------------------+----------+
| id | ingested | srcip |
+----+--------------------------+----------+
| 19 | 2024-05-08T01:22:00.000Z | 10.0.0.5 |
| 20 | 2024-05-08T01:22:01.000Z | 10.0.0.5 |
| 21 | 2024-05-08T01:22:02.000Z | 10.0.0.5 |
+----+--------------------------+----------+
Sequence found!
+----+
| id |
+----+
| 25 |
| 19 |
| 20 |
| 21 |
+----+
************************************
^C
****************** Metrics ******************
Total queries: 7
Total bytes sent: 2397
Total bytes received: 7022
Total time: 0.2047252655029297 seconds
Mean time: 0.02924646650041853 seconds
Mean bytes sent: 342.42857142857144
Mean bytes received: 1003.1428571428571
Query 1
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-success") AND event.ingested > '1970-01-01T00:00:00' ORDER BY event.ingested ASC;
Bytes sent: 279
Bytes received: 1641
Time: 0.029362201690673828 seconds
Query 2
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.1") AND event.ingested > '2024-05-08T01:19:49' AND event.ingested < '2024-05-08T01:20:09' ORDER BY event.ingested ASC;
Bytes sent: 353
Bytes received: 827
Time: 0.034539222717285156 seconds
Query 3
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.2") AND event.ingested > '2024-05-08T01:20:01' AND event.ingested < '2024-05-08T01:20:21' ORDER BY event.ingested ASC;
Bytes sent: 353
Bytes received: 625
Time: 0.028973817825317383 seconds
Query 4
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.3") AND event.ingested > '2024-05-08T01:20:01' AND event.ingested < '2024-05-08T01:20:21' ORDER BY event.ingested ASC;
Bytes sent: 353
Bytes received: 829
Time: 0.029796361923217773 seconds
Query 5
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.4") AND event.ingested > '2024-05-08T01:20:44' AND event.ingested < '2024-05-08T01:21:04' ORDER BY event.ingested ASC;
Bytes sent: 353
Bytes received: 627
Time: 0.029517650604248047 seconds
Query 6
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.4") AND event.ingested > '2024-05-08T01:20:44' AND event.ingested < '2024-05-08T01:21:09' ORDER BY event.ingested ASC;
Bytes sent: 353
Bytes received: 1033
Time: 0.025689125061035156 seconds
Query 7
SELECT test.id, event.ingested, test.srcip FROM wazuh-alerts-5.x-* AS idx WHERE (test.category = "t1") AND (test.srcip IS NOT NULL)AND (test.action = "logon-failed") AND (test.srcip = "10.0.0.5") AND event.ingested > '2024-05-08T01:21:46' AND event.ingested < '2024-05-08T01:22:06' ORDER BY event.ingested ASC;
Bytes sent: 353
Bytes received: 1440
Time: 0.026846885681152344 seconds
************************************ |
Daily update
Next steps:
cc: @JavierBejMen |
Daily Update
|
Daily update
- name: Potential DNS Tunneling via NsLookup
check:
- event.category: process
- host.os.type: windows
group_by:
- host.id
timeframe: 300
sequence:
- frequency: 10
check:
- event.type: start
- process.name: nslookup.exe
- name: Potential Command and Control via Internet Explorer
check:
- host.os.type: windows
group_by:
- host.id
- user.name
timeframe: 5
sequence:
- check:
- event.category: library
- dll.name: IEProxy.dll
frequency: 1
- check:
- event.category: process
- event.type: start
- process.parent.name : iexplore.exe
- process.parent.args : -Embedding
frequency: 1
- check:
- event.category: network
- network.protocol: dns
- process.name: iexplore.exe
frequency: 5
- name: Multiple Logon Failure Followed by Logon Success
check:
- event.category: authentication
group_by:
- source.ip
- user.name
timeframe: 5
sequence:
- check:
- event.action: logon-failed
frequency: 5
- check:
- event.action: logon-success
frequency: 1
Python poc with cache#!python3
import yaml
import argparse
import signal
import sys
from typing import List, Optional, Mapping, Any, Union, Collection
from datetime import datetime, timezone
import time
from opensearchpy import OpenSearch, Transport, TransportError, ConnectionTimeout, ConnectionError # pip install opensearch-py
from prettytable import PrettyTable # pip install prettytable
# OpenSearch connection settings
OS_HOST = "localhost"
OS_PORT = 9200
OS_USER = "admin"
OS_PASS = "wazuhEngine5+"
# Don't forget to change the index name, no patter support in where clause
INDEX = "wazuh-alerts-5.x-*"
TS_FIELD = "event.ingested"
DEBUG_LEVEL = 2
################################################
# Metrics
################################################
class QueryMetrics:
def __init__(self, query: str, bytes_sent: int, bytes_received: int, time_r: float) -> None:
self.query = query
self.bytes_sent = bytes_sent
self.bytes_received = bytes_received
self.time_r = time_r
################################################
# Opensearch and printers
################################################
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class OpenSearchConnector:
'''
Class that represents a connection to OpenSearch.
Attributes:
- opensearch: OpenSearch: OpenSearch object
'''
class CustomTransport(Transport):
def perform_request(
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = None,
body: Any = None,
timeout: Optional[Union[int, float]] = None,
ignore: Collection[int] = (),
headers: Optional[Mapping[str, str]] = None,
) -> Any:
"""
Perform the actual request. Retrieve a connection from the connection
pool, pass all the information to its perform_request method and
return the data.
If an exception was raised, mark the connection as failed and retry (up
to `max_retries` times).
If the operation was successful and the connection used was previously
marked as dead, mark it as live, resetting its failure count.
:arg method: HTTP method to use
:arg url: absolute url (without host) to target
:arg headers: dictionary of headers, will be handed over to the
underlying :class:`~opensearchpy.Connection` class
:arg params: dictionary of query parameters, will be handed over to the
underlying :class:`~opensearchpy.Connection` class for serialization
:arg body: body of the request, will be serialized using serializer and
passed to the connection
"""
method, params, body, ignore, timeout = self._resolve_request_args(
method, params, body
)
for attempt in range(self.max_retries + 1):
connection = self.get_connection()
try:
# Calculate bytes sent
bytes_sent = 0
if headers:
bytes_sent += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers.items())
if body:
if isinstance(body, str):
body_bytes = body.encode('utf-8')
elif isinstance(body, bytes):
body_bytes = body
else:
body_bytes = self.serializer.dumps(
body).encode('utf-8')
bytes_sent += len(body_bytes)
if params:
bytes_sent += len(url.encode('utf-8')) + len(
'&'.join(f"{k}={v}" for k, v in params.items()).encode('utf-8'))
else:
bytes_sent += len(url.encode('utf-8'))
# Measure time
start_time = time.time()
status, headers_response, data = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
# Measure time
end_time = time.time()
time_r = end_time - start_time
# Lowercase all the header names for consistency in accessing them.
headers_response = {
header.lower(): value for header, value in headers_response.items()
}
# Calculate bytes received
bytes_received = 0
if headers_response:
bytes_received += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers_response.items())
if data:
bytes_received += len(data)
except TransportError as e:
if method == "HEAD" and e.status_code == 404:
return False
retry = False
if isinstance(e, ConnectionTimeout):
retry = self.retry_on_timeout
elif isinstance(e, ConnectionError):
retry = True
elif e.status_code in self.retry_on_status:
retry = True
if retry:
try:
# only mark as dead if we are retrying
self.mark_dead(connection)
except TransportError:
# If sniffing on failure, it could fail too. Catch the
# exception not to interrupt the retries.
pass
# raise exception on last retry
if attempt == self.max_retries:
raise e
else:
raise e
else:
# connection didn't fail, confirm its live status
self.connection_pool.mark_live(connection)
if method == "HEAD":
return 200 <= status < 300
if data:
data = self.deserializer.loads(
data, headers_response.get("content-type")
)
# Return data and metrics
return data, bytes_sent, bytes_received, time_r
def __init__(self):
self.query_metrics : List[QueryMetrics] = []
self.opensearch = OpenSearch(
[{"host": OS_HOST, "port": OS_PORT}],
http_auth=(OS_USER, OS_PASS),
http_compress=True,
use_ssl=True,
verify_certs=False,
timeout=30,
ssl_show_warn=False,
transport_class=self.CustomTransport
)
if not self.opensearch.ping():
# Show the error message
print(self.opensearch.info())
exit(1)
# Check if sql plugin is enabled
list_json_plugins, _, _, _ = self.opensearch.cat.plugins(
params={"s": "component", "v": "true", "format": "json"})
list_plugins = [plugin["component"] for plugin in list_json_plugins]
if "opensearch-sql" not in list_plugins:
print("The SQL plugin is not enabled.")
exit(1)
def log_query(self, query: QueryMetrics) -> None:
self.query_metrics.append(query)
def dump_metrics(self) -> str:
total_bytes_sent = sum(
[query.bytes_sent for query in self.query_metrics])
total_bytes_received = sum(
[query.bytes_received for query in self.query_metrics])
total_time = sum([query.time_r for query in self.query_metrics])
dump = f"Total queries: {len(self.query_metrics)}\nTotal bytes sent: {total_bytes_sent}\nTotal bytes received: {total_bytes_received}\nTotal time: {total_time} seconds\n"
mean_time = total_time / len(self.query_metrics)
mean_bytes_sent = total_bytes_sent / len(self.query_metrics)
mean_bytes_received = total_bytes_received / len(self.query_metrics)
dump += f"Mean time: {mean_time} seconds\nMean bytes sent: {mean_bytes_sent}\nMean bytes received: {mean_bytes_received}\n"
for i, query in enumerate(self.query_metrics):
dump += f"\nQuery {i + 1}\n{query.query}\nBytes sent: {query.bytes_sent}\nBytes received: {query.bytes_received}\nTime: {query.time_r} seconds\n"
return dump
def generate_event_table(keys, hits) -> str:
'''
Generate a table with the events, showing only the keys specified.
Args:
- keys: List[str]: List of the keys to show in the table
- hits: List[dict]: List of the events to show in the table
'''
# Create a PrettyTable object
table = PrettyTable()
# Set columns from the keys
# table.field_names = [k.split(".")[-1] for k in keys]
keys.append("event.id")
table.field_names = keys
# Add rows to the table
for entry in hits:
# Ensure each key is present in the dictionary to avoid KeyError
row = []
for k in keys:
it = entry
for subk in k.split("."):
try:
it = it[subk]
except KeyError:
it = "-"
break
row.append(it)
table.add_row(row)
# Set table style
table.border = True
table.horizontal_char = '-'
table.vertical_char = '|'
table.junction_char = '+'
# Return the table as a string
return table.get_string()
################################################
# Rule components
################################################
def get_field(event: dict, field: str, exit_on_error: bool = True):
'''
Get the value of a field in the event.
Args:
- event: dict: Event data
- field: str: Field name
- exit_on_error: bool: Exit the program if the field is not found
Returns:
- str: Value of the field in the event
'''
value = event
for key in field.split("."):
try:
value = value[key]
except KeyError:
if exit_on_error:
print(f"{bcolors.FAIL}Error: {field} not found in the event.{bcolors.ENDC}")
exit(1)
return None
return value
class Entry:
'''
Class that represents an event in the sequence.
Attributes:
- step: int: Step of the event in the sequence
- event: dict: Event data
- timestamp: int: Timestamp of the event
'''
def __init__(self, step: int, event: dict) -> None:
'''
Create an Entry object.
Args:
- step: int: Step of the event in the sequence
- event: dict: Event data
'''
self.step: int = step
self.event = event
# Get the timestamp of the event
field = get_field(event, TS_FIELD)
# If field is not string, convert it to string
if not isinstance(field, str):
field = str(field)
field = field.replace('Z', '+00:00')
date_obj = datetime.fromisoformat(field)
self.timestamp = int(date_obj.timestamp())
class Cache:
'''
Class that represents a cache for the events, acumulates the events of the same sequence.
Attributes:
- cache: dict: Cache of the events
'''
def __init__(self):
self.elements = []
def to_str(self, fields: List[str]) -> str:
'''
Cache to string
Args:
- fields: List[str]: List of the fields to print
'''
return generate_event_table(fields, [entry.event for entry in self.elements])
def __len__(self):
return len(self.elements)
class _sql_static_field:
'''
Class that represents a field filter in the query.
A field filter is a field that should be equal to a specific value.
Attributes:
- field: str: Field name
- value: str: Value to compare
'''
def __init__(self, field: str, value: str):
self.field = field
self.value = value
def get_query(self):
'''
Get the sql condition for the field filter.
Returns:
- str: SQL condition for the field filter I.E. (field = value)
'''
return f'{self.field} = "{self.value}"'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the field filter.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the field filter, False otherwise
'''
field = get_field(hit, self.field, False)
if field is None:
return False
# if field is a list, take the first element
if isinstance(field, list):
field = field[0]
return field == self.value
class _sql_dinamic_field:
'''
Class that represents a field filter in the query, used to same fields.
A field filter is a field that should be equal to a other field. The value is not specified.
Attributes:
- field: str: Field name
'''
def __init__(self, field: str):
self.field = field
def get_query(self) -> str:
'''
Get the sql query condition for the field filter.
Returns:
- str: SQL condition for the field exists I.E. (field IS NOT NULL)
'''
return f'{self.field} IS NOT NULL'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the field filter.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the field filter, False otherwise
'''
field = get_field(hit, self.field, False)
return field is not None
class Step:
"""
Represent a step in the sequence.
Attributes:
- filter: list[_sql_static_field]: Required fields with values to filter the events (Mandatory)
- frequency: int: Frequency of the step (default 1 hit)
- group_by_fields: list[_sql_dinamic_field]: Fields that should be equal to a other field (optional)
"""
def __init__(self, filter: List[_sql_static_field], frequency: int, group_by_fields: List[_sql_dinamic_field]):
self.filter: List[_sql_static_field] = filter
self.frequency: int = frequency
self.group_by_fields: List[_sql_dinamic_field] = group_by_fields
def get_query(self) -> str:
'''
Get the query for the step
The query is a string that represents the step to fetch the events from OpenSearch.
Is a combination of the filter fields in AND condition and the equal fields that should be exists.
Returns:
- str: SQL query for the step
'''
query = ' AND '.join([field.get_query() for field in self.filter + self.group_by_fields])
return f'({query})'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the step.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the step, False otherwise
'''
for field in self.filter:
if not field.evaluate(hit):
return False
for field in self.group_by_fields:
if not field.evaluate(hit):
return False
return True
class Rule:
"""
Rule class that contains the information of a rule.
Attributes:
- name: str - Name of the rule
- timeframe: int - Timeframe for the rule
- group_by_fields: list[str] - Fields that should be the same (optional)
- static_fields: dict[str, str] - Fields that should be equal to a specific value (optional)
- last_ingested: int - Last event fetched
- _sequence: list[Step] - Sequence of the rule
- _caches: dict - Cache of the events, each key is a hash of the same fields
"""
def __init__(self, name: str, timeframe: int, group_by_fields: 'list[str]', static_fields: 'dict[str, str]'):
'''
Create a Rule object.
Args:
- name: str: Name of the rule
- timeframe: int: Timeframe for the rule
- group_by_fields: list[str]: Fields that should be the same (optional)
- static_fields: dict[str, str]: Fields that should be equal to a specific value (optional)
'''
self.name: str = name
self.timeframe: int = timeframe
self.group_by_fields: List[_sql_dinamic_field] = [_sql_dinamic_field(field) for field in group_by_fields]
self.static_fields: List[_sql_static_field] = [_sql_static_field(
field, value) for field, value in static_fields.items()]
self._sequence: List[Step] = []
self.last_ingested: int = 0
self._caches: dict = {}
def add_step(self, step: Step):
'''
Add a step to the sequence of the rule.
Args:
- step: Step: Step to add to the sequence
'''
# Verify if the step has the same length of the equal fields of the previous step
if len(self._sequence) > 0 and len(step.group_by_fields) != len(self._sequence[-1].group_by_fields):
raise ValueError('Error: The step has not the same length of the equal fields of the previous step.')
self._sequence.append(step)
def _get_global_query_condition(self):
'''
Get the global query condition for the rule.
The global query condition is a combination of the same fields and static fields in AND condition,
all events should match this condition.
Returns:
- str: SQL query for the global condition
'''
query = ' AND '.join([field.get_query() for field in self.group_by_fields + self.static_fields])
# If there is no global query return an empty string
if query == '':
return ''
return f'({query})'
def _get_condition(self) -> str:
'''
Get the condition query for the rule
The query is a string that represents the rule to fetch the events from OpenSearch.
Is a combination of the global query AND the sequence queries in OR condition.
Returns:
- str: Query for the rule
'''
global_query = self._get_global_query_condition()
sequence_query = ' OR '.join(
[step.get_query() for step in self._sequence])
# If there is no global query return the sequence query
if global_query == '':
return sequence_query
return f'{global_query} AND ({sequence_query})'
def get_query(self) -> str:
'''
Get the query for the rule
The query is a string that represents the rule to fetch the events from OpenSearch.
'''
time_str = datetime.fromtimestamp(self.last_ingested, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')
query_str = f"SELECT * FROM {INDEX} AS idx WHERE {TS_FIELD} > '{time_str}' AND ({self._get_condition()}) ORDER BY {TS_FIELD} ASC;"
return query_str
def _list_fields(self) -> List[str]:
'''
List all unique fields interested in the rule.
Returns:
- List[str]: List of the fields of the rule
'''
fields: List[str] = [TS_FIELD]
for field in self.group_by_fields:
fields.append(field.field)
for field in self.static_fields:
fields.append(field.field)
for step in self._sequence:
for field in step.filter:
fields.append(field.field)
for field in step.group_by_fields:
fields.append(field.field)
# Remove duplicates
fields = list(dict.fromkeys(fields))
return fields
def _fetch_events(self, osc: OpenSearchConnector) -> List[dict]:
'''
Fetch the events from OpenSearch.
The function fetch the events sice the last ingested event.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.OKBLUE}Fetching events...{bcolors.ENDC}{bcolors.ENDC}')
response = None
query = self.get_query()
try:
response, bytes_sent, bytes_received, time_r = osc.opensearch.transport.perform_request(
url="/_plugins/_sql/",
method="POST",
params={"format": "json", "request_timeout": 30},
body={"query": query}
)
osc.log_query(QueryMetrics(query, bytes_sent, bytes_received, time_r))
except Exception as e:
print(f"Error: {e}")
exit(1)
# No se si esto esta bien
if 'error' in response:
print(f"Error: {response['error']['reason']}")
exit(1)
if 'hits' not in response or 'hits' not in response['hits'] or len(response['hits']['hits']) == 0:
return []
# Save the last time of the query
last_ingested_str = response['hits']['hits'][-1]['_source']['event']['ingested']
self.last_ingested = int(datetime.fromisoformat(
last_ingested_str.replace('Z', '+00:00')).timestamp())
if DEBUG_LEVEL > 0:
print(f"Last ingested: {last_ingested_str}")
if DEBUG_LEVEL > 1:
print(f"Query: {query}")
# Create a list of event of the response
hit_list = response['hits']['hits']
# Check the events
events: List[dict] = []
for hit in hit_list:
events.append(hit['_source'])
if DEBUG_LEVEL > 0:
print(f"Events fetched: {len(events)}")
if DEBUG_LEVEL > 1:
print(f'{bcolors.OKGREEN}{generate_event_table(self._list_fields(), events)}{bcolors.ENDC}')
return events
def _fill_cache(self, events: List[dict]):
'''
Fill the cache with the events.
The function insert the events in the cache.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.OKBLUE}Filling cache...{bcolors.ENDC}{bcolors.ENDC}')
# Check which stage an event belongs to
for event in events:
entry = None
# Check if the event match a Step
for i, step in enumerate(self._sequence):
if step.evaluate(event):
entry = Entry(i, event)
break
if entry is None:
print(f"{bcolors.FAIL}Error: Event does not match any step.{bcolors.ENDC}")
exit(1)
# Get the value of same fild and equal fields
rule_group_by_fields_values = [get_field(event, field.field) for field in self.group_by_fields]
group_by_fields_values = [get_field(event, field.field) for field in self._sequence[entry.step].group_by_fields]
obj_hash = hash(tuple(rule_group_by_fields_values + group_by_fields_values))
str_hast = str(obj_hash)
if DEBUG_LEVEL > 2:
print(f"---- ANALYZING EVENT ----")
# print(f"Event: {event}")
print(f"Event ts: {get_field(event, TS_FIELD)}")
print(f"Step: {entry.step}")
print(f"Same fields: {rule_group_by_fields_values}")
print(f"Equal fields: {group_by_fields_values}")
print(f"Hash: {str_hast}")
# Search the cache
if str_hast in self._caches:
cache = self._caches[str_hast]
cache.elements.append(entry)
else:
cache = Cache()
cache.elements.append(entry)
self._caches[str_hast] = cache
# Print the cache
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== New cache state for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
for key, cache in self._caches.items():
print(f"Key: {key}")
print(f'{bcolors.OKGREEN}{cache.to_str(self._list_fields())}{bcolors.ENDC}')
class _match_result:
'''
Class that represents the result of the match of the cache with the sequence.
Attributes:
- dirty: bool: True if the cache was modified
- events: list[dict]: List of the events that match the sequence
'''
def __init__(self, cache_modified: bool, events: List[dict] = []):
self._events: List[dict] = events
self._cache_modified: bool = cache_modified
def __bool__(self) -> bool:
return self._cache_modified
def alert_events(self) -> List[dict]:
return self._events
def _match_sequence(self, cache: Cache) -> _match_result:
'''
Check if the cache match the sequence.
Args:
- cache: Cache: Cache to check
Returns:
- _match_result: Result of the match
'''
current: int = 0 # Current event index
step_index: int = 0 # Current step index
hit_counter: int = 0 # Current count of events that match the step condition
# List index of events that match the condition of the step
list_success_events: List[int] = []
# Iterate over the events in the cache
while current < len(cache.elements) and step_index < len(self._sequence):
# Check which step the event matches
event = cache.elements[current]
# If the event is from the next step, remove the event from the cache
if event.step > step_index:
cache.elements.pop(current)
continue
# If the event is from the previous step, skip it
if event.step < step_index:
current += 1
continue
# if is the same step, check if the event is out of the timeframe
if event.timestamp - cache.elements[0].timestamp > self.timeframe:
# Remove first element, and try all steps again
cache.elements.pop(0)
return self._match_result(True)
# The event meets the step condition
hit_counter += 1 # Increase the hit counter
# Add the event to the list of successful events
list_success_events.append(current)
current += 1 # Move to the next event
# Check if the step condition was met the required number of times
if hit_counter == self._sequence[step_index].frequency:
# Move to the next step
step_index += 1
hit_counter = 0
# Matched all the steps
if step_index == len(self._sequence):
# Extract the matched events from the cache
alert_event: List[dict] = []
for i in reversed(list_success_events):
alert_event.append(cache.elements.pop(i).event)
alert_event.reverse()
return self._match_result(True, alert_event)
return self._match_result(False)
def match(self):
# Iterate over the caches
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Matching sequences for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
new_caches_state: bool = False
for key, cache in self._caches.items():
# Check if the cache matches the sequence and return the events
while result := self._match_sequence(cache):
new_caches_state = True
if len(result.alert_events()) > 0:
print(f'{bcolors.BOLD}{bcolors.UNDERLINE}{bcolors.OKCYAN}Matched sequence:{bcolors.ENDC}{bcolors.ENDC}{bcolors.ENDC}')
print(f'source cache key: {key}')
print(f'{bcolors.OKCYAN}{generate_event_table(self._list_fields(), result.alert_events())}{bcolors.ENDC}')
print()
if DEBUG_LEVEL > 0 and new_caches_state:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== New cache state for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
for key, cache in self._caches.items():
print(f"Key: {key}")
print(f'{bcolors.OKGREEN}{cache.to_str(self._list_fields())}{bcolors.ENDC}')
def update(self, osc: OpenSearchConnector):
'''
Fetch the events from OpenSearch and update cache.
The function fetch the events sice the last ingested event and update the cache with the new events.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Updating rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
events = self._fetch_events(osc)
if len(events) == 0:
if DEBUG_LEVEL > 0:
print(f'{bcolors.WARNING}No events fetched.{bcolors.ENDC}')
return
self._fill_cache(events)
self.match()
################################################
# Parsers
################################################
# File
def parse_yaml(file_path: str) -> dict:
'''
Parse a YAML file and return the data as a dictionary.
'''
with open(file_path, 'r') as file:
data = yaml.safe_load(file)
return data
def parse_step(sequence: dict) -> Step:
# Filter events to fetch on OpenSearch (Mandatory)
static_fields_list: list[_sql_static_field] = []
frequency: int = 1 # Frequency of the step (default 1 hit)
# Fields that should be equal (optional)
group_by_fields: list[_sql_dinamic_field] = []
# Get the filter (if exists should be a map of field and value strings)
try:
raw_check = sequence['check']
# Check if the filter is a list of object with key and value strings
if not isinstance(raw_check, list) or not all(isinstance(field, dict) for field in raw_check):
raise ValueError('Error: filter must be an array of object with key and value strings.')
# iterate over the filter (raw_check) adding them to the static_fields_list
for field in raw_check:
for field_name, field_value in field.items():
static_fields_list.append(_sql_static_field(field_name, field_value))
except KeyError:
raise ValueError('Error: filter not found in the step.')
# Get the frecuency (if exists should be a positive integer)
try:
frequency = sequence['frequency']
# Check if the frequency is an integer and positive
if not isinstance(frequency, int) or frequency < 1:
raise ValueError('Error: frequency must be a positive integer.')
except KeyError:
pass
# Get the equal fields (if exists should be a list of strings)
try:
raw_group_by = sequence['group_by']
# Check if the raw_group_by is a list of strings
if not isinstance(raw_group_by, list) or not all(isinstance(field, str) for field in raw_group_by):
raise ValueError('Error: group_by_fields must be a list of strings.')
# Create the group_by_fields
group_by_fields = [_sql_dinamic_field(field) for field in raw_group_by]
except KeyError:
pass
return Step(static_fields_list, frequency, group_by_fields)
def parse_rule(rule: dict) -> Rule:
timeframe: int = 0 # Timeframe for the rule
group_by_fields: list[str] = [] # Fields that should be the same (optional)
# Fields that should be equal to a specific value (optional)
static_fields: dict[str, str] = {}
name: str = "No name"
# Get the name of the rule
try:
name = rule['name']
except KeyError:
pass
# Get the timeframe
try:
timeframe = rule['timeframe']
# Check if the timeframe is an integer and positive
if not isinstance(timeframe, int) or timeframe < 1:
raise ValueError('Error: timeframe must be a positive integer.')
except KeyError:
raise ValueError('Error: timeframe not found in the rule.')
# Get the same fields (if exists should be a list of strings)
try:
group_by_fields = rule['group_by']
# Check if the group_by_fields is a list of strings
if not isinstance(group_by_fields, list) or not all(isinstance(field, str) for field in group_by_fields):
raise ValueError('Error: group_by_fields must be a list of strings.')
except KeyError:
pass
# Get the static fields (if exists should be a list of strings)
try:
raw_static_fields = rule['check']
# Check if the static_fields is a list of object with key and value strings
if not isinstance(raw_static_fields, list) or not all(isinstance(field, dict) for field in raw_static_fields):
raise ValueError('Error: static_fields must be a list of object with key and value strings.')
# iterate over the static fields (raw_static_fields) adding them to the static_fields
for field in raw_static_fields:
for field_name, field_value in field.items():
static_fields[field_name] = field_value
except KeyError:
pass
# Check if the sequence is a list of steps
try:
sequence = rule['sequence']
# Check if the sequence is a list of steps
if not isinstance(sequence, list) or not all(isinstance(step, dict) for step in sequence):
raise ValueError('Error: sequence must be a list of steps.')
except KeyError:
raise ValueError('Error: sequence not found in the rule.')
# Create the rule
r = Rule(name, timeframe, group_by_fields, static_fields)
# Parse the steps
for step in sequence:
s = parse_step(step)
r.add_step(s)
return r
################################################
def main(yaml_file):
osc = OpenSearchConnector()
rule_definitions = parse_yaml(yaml_file)
rule_list : List[Rule] = []
# Check if rule_definition is a list of rules (Array of objects)
if not isinstance(rule_definitions, list) or not all(isinstance(rule_definition, dict) for rule_definition in rule_definitions):
raise ValueError('Error: rule_definition must be an array of objects.')
for rule_definition in rule_definitions:
rule = parse_rule(rule_definition)
rule_list.append(rule)
try:
while True:
for rule in rule_list:
rule.update(osc)
time.sleep(5)
except KeyboardInterrupt:
print(f"{bcolors.WARNING}Exiting...{bcolors.ENDC}")
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Stats ===============*{bcolors.ENDC}{bcolors.ENDC}')
print(osc.dump_metrics())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Load a sequence rule.')
parser.add_argument(
'-i', type=str, help='Path to the input YAML file.', required=True)
args = parser.parse_args()
main(args.i) Next steps:
|
UpdateAdded event generator and improved second approach PoC output. Rule used for testing: name: decoder/json_all/0
parse|event.original:
- <tmp/json>
normalize:
- map:
- event.id: $tmp.event.id
- event.ingested: parse_date($tmp.event.ingested, "%FT%T")
- event.category: array_append($tmp.event.category)
- host.os.type: $tmp.host.os.type
- host.id: $tmp.host.id
- event.type: array_append($tmp.event.type)
- process.name: $tmp.process.name
- user.name: $tmp.user.name
- dll.name: $tmp.dll.name
- process.parent.name: $tmp.process.parent.name
- process.parent.args: array_append($tmp.process.parent.args)
- network.protocol: $tmp.network.protocol
- process.name: $tmp.process.name
- source.ip: $tmp.source.ip
- event.action: $tmp.event.action
- map:
- event.original: delete()
- tmp: delete() Second approach PoCInput: - name: Potential DNS Tunneling via NsLookup
total_events: 10000
success: 0.1
distribution: 0.5
groups: 5
template:
host.id: DYN_KEYWORD
event.category: process
host.os.type: windows
sequence:
window: 300
steps:
- freq: 10
template:
event.type: start
process.name: nslookup.exe
- name: Potential Command and Control via Internet Explorer
total_events: 10000
success: 0.1
distribution: 0.5
groups: 5
template:
host.id: DYN_KEYWORD
user.name: DYN_KEYWORD
host.os.type: windows
sequence:
window: 5
steps:
- freq: 1
template:
event.category: library
dll.name: IEProxy.dll
- freq: 1
template:
event.category: process
event.type: start
process.parent.name : iexplore.exe
process.parent.args : -Embedding
- freq: 5
template:
event.category: network
network.protocol: dns
process.name: iexplore.exe
- name: Multiple Logon Failure Followed by Logon Success
total_events: 10000
success: 0.1
distribution: 0.5
groups: 5
template:
source.ip: DYN_IP
user.name: DYN_KEYWORD
event.category: authentication
sequence:
window: 5
steps:
- freq: 5
template:
event.action: logon-failed
- freq: 1
template:
event.action: logon-success scriptfrom datetime import datetime, timedelta, timezone
from typing import List, Optional, Union, Mapping, Any, Collection
from argparse import ArgumentParser as AP
from pathlib import Path
from colorama import Fore, Back, Style, init
import time
import yaml
from opensearchpy import OpenSearch, Transport, ConnectionError, ConnectionTimeout, TransportError
from prettytable import PrettyTable # pip install prettytable
# OpenSearch connection settings
host = "localhost"
port = 9200
# Don't forget to change the index name, no patter support in where clause
INDEX = "wazuh-alerts-5.x-*"
username = "admin"
password = "wazuhEngine5+"
def generate_event_table(keys, hits) -> str:
# Create a PrettyTable object
table = PrettyTable()
# Set columns from the keys
table.field_names = keys
# Add rows to the table
for entry in hits:
# Ensure each key is present in the dictionary to avoid KeyError
row = []
for k in keys:
it = entry
for subk in k.split("."):
it = it[subk]
row.append(it)
table.add_row(row)
# Set table style
table.border = True
table.horizontal_char = '-'
table.vertical_char = '|'
table.junction_char = '+'
# Return the table as a string
return table.get_string()
class SCondition:
def __init__(self, field: str, value: str) -> None:
self.field = field
self.value = value
def query(self) -> str:
return f'{self.field} = "{self.value}"'
def eval(self, hit: dict) -> bool:
field = hit
for key in self.field.split("."):
field = field[key]
return field == self.value
class DCondition:
def __init__(self, field: str) -> None:
self.field = field
def query(self) -> str:
return f'{self.field} IS NOT NULL'
def eval(self, hit: dict) -> str:
field = hit
for key in self.field.split("."):
field = field[key]
return field
class SConditions:
def __init__(self, conditions: List[SCondition]) -> None:
self.conditions = conditions
def query(self) -> str:
return "(" + " AND ".join([c.query() for c in self.conditions]) + ")"
def eval(self, hit: dict) -> bool:
return all([c.eval(hit) for c in self.conditions])
class DConditions:
def __init__(self, conditions: List[DCondition]) -> None:
self.conditions = conditions
def query(self) -> str:
return "(" + " AND ".join([c.query() for c in self.conditions]) + ")"
def eval(self, hit: dict) -> str:
return "_".join([c.eval(hit) for c in self.conditions])
def matchQuery(self, key: str) -> str:
values = key.split("_")
return "(" + " AND ".join([f'{self.conditions[i].field} = "{values[i]}"' for i in range(len(values))]) + ")"
def select(self) -> str:
return ", ".join([c.field for c in self.conditions])
def fields(self) -> List[str]:
return [c.field for c in self.conditions]
class Step:
def __init__(self, condition: SConditions, frequency: int = 1) -> None:
self.condition = condition
self.frequency = frequency
def query(self) -> str:
return self.condition.query()
class Sequence:
def __init__(self, static: SConditions, dynamic: DConditions, timeframe: int) -> None:
self.static = static
self.dynamic = dynamic
self.timeframe = timeframe
self.steps = []
def __init__(self, definition: dict) -> None:
self.parse(definition)
def add_step(self, step: Step):
self.steps.append(step)
return self
def last(self) -> int:
return len(self.steps) - 1
def size(self) -> int:
return len(self.steps)
def matchQuery(self, key: str) -> str:
return self.dynamic.matchQuery(key)
def parse(self, definition: dict) -> None:
self.static = SConditions([SCondition(
field, value) for c in definition['check'] for field, value in c.items()])
self.dynamic = DConditions([DCondition(field)
for field in definition['group_by']])
self.timeframe = definition['timeframe']
self.steps = [Step(SConditions([SCondition(field, value) for c in step['check']
for field, value in c.items()]), step['frequency']) for step in definition['sequence']]
class QueryMetrics:
def __init__(self, query: str, bytes_sent: int, bytes_received: int, time_r: float) -> None:
self.query = query
self.bytes_sent = bytes_sent
self.bytes_received = bytes_received
self.time_r = time_r
class CustomTransport(Transport):
def perform_request(
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = None,
body: Any = None,
timeout: Optional[Union[int, float]] = None,
ignore: Collection[int] = (),
headers: Optional[Mapping[str, str]] = None,
) -> Any:
"""
Perform the actual request. Retrieve a connection from the connection
pool, pass all the information to its perform_request method and
return the data.
If an exception was raised, mark the connection as failed and retry (up
to `max_retries` times).
If the operation was successful and the connection used was previously
marked as dead, mark it as live, resetting its failure count.
:arg method: HTTP method to use
:arg url: absolute url (without host) to target
:arg headers: dictionary of headers, will be handed over to the
underlying :class:`~opensearchpy.Connection` class
:arg params: dictionary of query parameters, will be handed over to the
underlying :class:`~opensearchpy.Connection` class for serialization
:arg body: body of the request, will be serialized using serializer and
passed to the connection
"""
method, params, body, ignore, timeout = self._resolve_request_args(
method, params, body
)
for attempt in range(self.max_retries + 1):
connection = self.get_connection()
try:
# Calculate bytes sent
bytes_sent = 0
if headers:
bytes_sent += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers.items())
if body:
if isinstance(body, str):
body_bytes = body.encode('utf-8')
elif isinstance(body, bytes):
body_bytes = body
else:
body_bytes = self.serializer.dumps(
body).encode('utf-8')
bytes_sent += len(body_bytes)
if params:
bytes_sent += len(url.encode('utf-8')) + len(
'&'.join(f"{k}={v}" for k, v in params.items()).encode('utf-8'))
else:
bytes_sent += len(url.encode('utf-8'))
# Measure time
start_time = time.time()
status, headers_response, data = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
# Measure time
end_time = time.time()
time_r = end_time - start_time
# Lowercase all the header names for consistency in accessing them.
headers_response = {
header.lower(): value for header, value in headers_response.items()
}
# Calculate bytes received
bytes_received = 0
if headers_response:
bytes_received += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers_response.items())
if data:
bytes_received += len(data)
except TransportError as e:
if method == "HEAD" and e.status_code == 404:
return False
retry = False
if isinstance(e, ConnectionTimeout):
retry = self.retry_on_timeout
elif isinstance(e, ConnectionError):
retry = True
elif e.status_code in self.retry_on_status:
retry = True
if retry:
try:
# only mark as dead if we are retrying
self.mark_dead(connection)
except TransportError:
# If sniffing on failure, it could fail too. Catch the
# exception not to interrupt the retries.
pass
# raise exception on last retry
if attempt == self.max_retries:
raise e
else:
raise e
else:
# connection didn't fail, confirm its live status
self.connection_pool.mark_live(connection)
if method == "HEAD":
return 200 <= status < 300
if data:
data = self.deserializer.loads(
data, headers_response.get("content-type")
)
# Return data and metrics
return data, bytes_sent, bytes_received, time_r
class Rule:
"""
Represents a rule to apply to a pattern, the rule is responsible for querying the events
and applying the conditions to the events.
Parameters
----------
index : str
The pattern index to apply the rule.
Returns
-------
Rule
"""
def __init__(self, index: str, name: str) -> None:
self.name = name
self.opensearch = OpenSearch(
[{"host": host, "port": port}],
http_auth=(username, password),
http_compress=True,
use_ssl=True,
verify_certs=False,
timeout=30,
ssl_show_warn=False,
transport_class=CustomTransport
)
if not self.opensearch.ping():
# Show the error message
print(self.opensearch.info())
exit(1)
# Check if sql plugin is enabled
list_json_plugins, _, _, _ = self.opensearch.cat.plugins(
params={"s": "component", "v": "true", "format": "json"})
list_plugins = [plugin["component"] for plugin in list_json_plugins]
if "opensearch-sql" not in list_plugins:
print("The SQL plugin is not enabled.")
exit(1)
self.index = index
self.query_metrics = []
def log_query(self, query: QueryMetrics) -> None:
self.query_metrics.append(query)
def get(self, query: str):
response = None
try:
response, bytes_sent, bytes_received, time_r = self.opensearch.transport.perform_request(
url="/_plugins/_sql",
method="POST",
params={"format": "json", "request_timeout": 30},
body={"query": query}
)
self.log_query(QueryMetrics(
query, bytes_sent, bytes_received, time_r))
except Exception as e:
print(f"Error: {e}")
exit(1)
# No se si esto esta bien
if 'error' in response:
print(f"Error: {response['error']['reason']}")
exit(1)
if 'hits' not in response or 'hits' not in response['hits'] or len(response['hits']['hits']) == 0:
return []
return [event['_source'] for event in response['hits']['hits']]
def dump_metrics(self) -> str:
total_bytes_sent = sum(
[query.bytes_sent for query in self.query_metrics])
total_bytes_received = sum(
[query.bytes_received for query in self.query_metrics])
total_time = sum([query.time_r for query in self.query_metrics])
dump = f"Total queries: {len(self.query_metrics)}\nTotal bytes sent: {total_bytes_sent}\nTotal bytes received: {total_bytes_received}\nTotal time: {total_time} seconds\n"
mean_time = total_time / len(self.query_metrics)
mean_bytes_sent = total_bytes_sent / len(self.query_metrics)
mean_bytes_received = total_bytes_received / len(self.query_metrics)
dump += f"Mean time: {mean_time} seconds\nMean bytes sent: {mean_bytes_sent}\nMean bytes received: {mean_bytes_received}\n"
for i, query in enumerate(self.query_metrics):
dump += f"\nQuery {i + 1}\n{query.query}\nBytes sent: {query.bytes_sent}\nBytes received: {query.bytes_received}\nTime: {query.time_r} seconds\n"
return dump
class StateSeqRule(Rule):
def __init__(self, index: str, sequence: Sequence, name: str) -> None:
super().__init__(index, name)
self._sequence = sequence
self._last_time = datetime(1970, 1, 1, tzinfo=timezone.utc).timestamp()
def _common_query(self) -> str:
return f"SELECT event.id, event.ingested, {self._sequence.dynamic.select()} FROM {self.index} AS idx WHERE {self._sequence.static.query()} AND {self._sequence.dynamic.query()}"
def _sequence_initial_query(self) -> str:
return self._common_query() + f" AND {self._sequence.steps[-1].query()} AND event.ingested > '{datetime.fromtimestamp(self._last_time, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')}' ORDER BY event.ingested ASC;"
def _group_by_dynamic(self, hits: List[dict]) -> dict:
grouped_hits = {}
for hit in hits:
dynKey = self._sequence.dynamic.eval(hit)
if dynKey not in grouped_hits:
grouped_hits[dynKey] = []
grouped_hits[dynKey].append(hit)
return grouped_hits
def _search_sequence(self, dyn: str, hits: List[dict], step: int, lower_bound: Optional[str] = None, acc: Optional[List[dict]] = None) -> bool:
# Check current step matches frequency
print(Fore.LIGHTCYAN_EX + f'Checking step {step}')
print(Fore.LIGHTCYAN_EX +
f'Check frequency: {len(hits)} >= {self._sequence.steps[step].frequency}')
if (len(hits) < self._sequence.steps[step].frequency):
print(f'Not enough events for step {step}')
return False
# Select the oldest frequency events and start a search sequence for each group
groups = [hits[i:i + self._sequence.steps[step].frequency]
for i in range(0, len(hits), self._sequence.steps[step].frequency)]
print(Fore.LIGHTCYAN_EX + f'Found {len(groups)} groups')
for i, group in enumerate(groups):
print(Fore.LIGHTCYAN_EX + f'Group {i}')
keys = ['event.id', 'event.ingested']
keys.extend(self._sequence.dynamic.fields())
print(generate_event_table(keys, group))
# Add the key to the accumulator
next_acc = []
if acc is None:
next_acc = group
else:
next_acc.extend(acc)
next_acc.extend(group)
# Query for previous step
prev_step = step - 1
# Check found condition
print(Fore.LIGHTCYAN_EX +
f'Checking if step {step} is the first step')
if prev_step < 0:
print(Fore.RED + 'Sequence found!')
print(Fore.RED + generate_event_table(['event.id'], next_acc))
return True
# Calculate upper bound
print(Fore.LIGHTCYAN_EX +
f'Generating new query for step {prev_step}')
upper_bound_date = datetime.strptime(
group[0]['event']['ingested'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc).timestamp()
upper_bound = datetime.fromtimestamp(
upper_bound_date, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')
# Calculate lower bound
if lower_bound is None:
lower_bound = (datetime.strptime(
group[0]['event']['ingested'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) - timedelta(
seconds=self._sequence.timeframe)).strftime('%Y-%m-%dT%H:%M:%S')
print(Fore.LIGHTCYAN_EX + f'Upper bound: {upper_bound}')
print(Fore.LIGHTCYAN_EX + f'Lower bound: {lower_bound}')
query = self._common_query() + \
f"AND {self._sequence.steps[prev_step].query()} AND {self._sequence.matchQuery(dyn)} AND event.ingested >= '{lower_bound}' AND event.ingested <= '{upper_bound}' ORDER BY event.ingested ASC;"
print(f'Query: {Fore.LIGHTYELLOW_EX + query}')
hits = self.get(query)
print(generate_event_table(keys, hits))
found = self._search_sequence(
dyn, hits, prev_step, lower_bound, next_acc)
if not found:
continue
return True
return True
def update(self) -> None:
header = f'****************** {self.name} ******************'
footer = ''.join(['*' for _ in range(len(header))])
print('\n\n' + Fore.GREEN + header)
print('Timestamp: ', datetime.fromtimestamp(
self._last_time, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))
initial_query = self._sequence_initial_query()
print(f'Initial query: {Fore.LIGHTYELLOW_EX + initial_query}')
hits = self.get(initial_query)
if len(hits) == 0:
print('No new events')
print(Fore.GREEN + footer)
return
self._last_time = datetime.strptime(
hits[-1]['event']['ingested'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc).timestamp()
keys = ['event.id', 'event.ingested']
keys.extend(self._sequence.dynamic.fields())
print(generate_event_table(keys, hits))
print('New Timestamp: ', datetime.fromtimestamp(
self._last_time, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))
print(Fore.LIGHTBLUE_EX + '\n\nSearching for sequences...')
grouped_hits = self._group_by_dynamic(hits)
for dyn, hits in grouped_hits.items():
print(f'\nGrouped by: {Fore.LIGHTBLUE_EX + dyn}')
print(generate_event_table(keys, hits))
self._search_sequence(dyn, hits, self._sequence.last())
print(Fore.LIGHTBLUE_EX + 'Finished searching for sequences.')
print(Fore.GREEN + footer)
def main(input_rules_path: str):
init(autoreset=True)
# Load the rules
in_rules = Path(input_rules_path)
if not in_rules.exists():
print(f"File {in_rules} not found")
exit(1)
if not in_rules.is_file():
print(f"{in_rules} is not a file")
exit(1)
# Load the yaml rules
rules = []
with open(in_rules, 'r') as stream:
try:
rules = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
exit(1)
index = "wazuh-alerts-5.x-*"
# Create the sequence rules
seq_rules = [StateSeqRule(index, Sequence(rule), rule['name'])
for rule in rules]
try:
while True:
for seqRule in seq_rules:
seqRule.update()
time.sleep(15)
except KeyboardInterrupt:
print('\n\n****************** Metrics ******************')
for seqRule in seq_rules:
print(seqRule.dump_metrics())
print('************************************')
exit(0)
if __name__ == "__main__":
parser = AP(description="Correlation POC second approach")
parser.add_argument("input_rules_path", help="Path to the rules file")
args = parser.parse_args()
main(args.input_rules_path) Event generatorInput: - name: Potential DNS Tunneling via NsLookup
check:
- event.category: process
- host.os.type: windows
group_by:
- host.id
timeframe: 300
sequence:
- frequency: 10
check:
- event.type: start
- process.name: nslookup.exe
- name: Potential Command and Control via Internet Explorer
check:
- host.os.type: windows
group_by:
- host.id
- user.name
timeframe: 5
sequence:
- check:
- event.category: library
- dll.name: IEProxy.dll
frequency: 1
- check:
- event.category: process
- event.type: start
- process.parent.name : iexplore.exe
- process.parent.args : -Embedding
frequency: 1
- check:
- event.category: network
- network.protocol: dns
- process.name: iexplore.exe
frequency: 5
- name: Multiple Logon Failure Followed by Logon Success
check:
- event.category: authentication
group_by:
- source.ip
- user.name
timeframe: 5
sequence:
- check:
- event.action: logon-failed
frequency: 5
- check:
- event.action: logon-success
frequency: 1 scriptimport yaml
import random
import string
import argparse
import json
from typing import Tuple, List, Callable, Generator
from pathlib import Path
from datetime import datetime
from functools import reduce
id = 0
time = 0
base_time = datetime.now().timestamp()
def generate_id() -> str:
global id
id += 1
return str(id)
def get_time(inc: float) -> str:
global time
time += inc
tick = base_time + time
return datetime.fromtimestamp(tick).isoformat()
def get_current_time() -> datetime:
global time
tick = base_time + time
return datetime.fromtimestamp(tick)
def reset_time():
global time
time = 0
def load_templates(input_raw: str) -> dict:
with open(input_raw, 'r') as f:
templates = yaml.safe_load(f)
return templates
def get_value_generator(value_type: str) -> Callable:
if value_type == 'DYN_KEYWORD':
def generate_value() -> str:
return ''.join(random.choices(string.ascii_letters + string.digits, k=10))
elif value_type == 'DYN_IP':
def generate_value() -> str:
return '.'.join([str(random.randint(0, 255)) for _ in range(4)])
else:
def generate_value() -> str:
return value_type
return generate_value
class DynTemplate:
def __init__(self, field: str, value_type: str) -> None:
self.field = field
self.value_type = value_type
self.value_generator = get_value_generator(value_type)
def generate(self) -> Tuple[str, str]:
return self.field, self.value_generator()
def dyn_generator(groups: int, size: int, fields: List[DynTemplate]) -> Callable:
def generator() -> List[List[dict]]:
event_groups = []
for _ in range(groups):
events = []
event_base = dict()
for _, dyn in enumerate(fields):
field, value = dyn.generate()
event_base[field] = value
for _ in range(size):
event = event_base.copy()
event['event.id'] = generate_id()
events.append(event)
event_groups.append(events)
return event_groups
return generator
class Sequencer:
class Step:
def __init__(self, step: dict) -> None:
self.frequency = step['freq']
self.dyn_fields = [DynTemplate(field, value_type)
for field, value_type in step['template'].items()]
def __init__(self, sequence: dict) -> None:
self.window = sequence['window']
self.steps = [self.Step(step) for step in sequence['steps']]
self.interval = (self.window*0.9) / reduce(lambda x,
y: x + y.frequency, self.steps, 0)
def generate(self, success_rate: float, group: List[dict]) -> Generator[dict, None, None]:
step = 0
step_freq = 0
success_max = int(len(group) * success_rate)
success_counter = 0
success_inc = len(group) // success_max
print(
f" Success max: {success_max}, Success inc: {success_inc}")
fail_cause = 0
fail = False
for event in group:
sequenced_event = event.copy()
if success_counter >= success_inc:
fail = True
success_counter = 0
fail_cause += 1
if fail_cause > 1:
fail_cause = 0
if not fail:
success_counter += 1
sequenced_event['event.ingested'] = get_time(self.interval)
if fail and fail_cause == 0:
current_time = get_current_time()
current_time = current_time.replace(hour=current_time.hour + 1 if current_time.hour < 23 else 0)
sequenced_event['event.ingested'] = current_time.isoformat()
fail = False
for dyn in self.steps[step].dyn_fields:
field, value = dyn.generate()
if fail and fail_cause == 1:
value = 'missingstepfield'
fail = False
sequenced_event[field] = value
step_freq += 1
if step_freq >= self.steps[step].frequency:
step_freq = 0
step += 1
if step >= len(self.steps):
step = 0
yield sequenced_event
class EventGenerator:
def __init__(self, template: dict) -> None:
self.name = template['name']
self.total_events = template['total_events']
self.success_rate = template['success']
self.dyn_dist = template['distribution']
self.dyn_groups = template['groups']
self.dyn_size = int(self.total_events *
self.dyn_dist // self.dyn_groups)
self.dyn_fields = [DynTemplate(field, value_type)
for field, value_type in template['template'].items()]
self.dyn_generate = dyn_generator(
self.dyn_groups, self.dyn_size, self.dyn_fields)
self.sequence = Sequencer(template['sequence'])
print(f'\n\n{self.name}')
print(f' Total events: {self.total_events}')
print(f' Groups: {self.dyn_groups}')
print(f' Group size: {self.dyn_size}')
def generate(self) -> List[dict]:
print(f'\n\nGenerating events for {self.name}...')
grouped_events = self.dyn_generate()
final_events = []
for group in grouped_events:
final_events.extend(
[event for event in self.sequence.generate(self.success_rate, group)])
reset_time()
remaining_events = self.total_events - len(final_events)
final_copy = final_events.copy()
print(f' Remaining failure events added: {remaining_events}')
for _ in range(remaining_events):
event = random.choice(final_copy).copy()
dyn_field = random.choice(self.dyn_fields)
event['event.id'] = generate_id()
if dyn_field.value_type == 'DYN_IP':
event[dyn_field.field] = '.'.join(
[str(random.randint(0, 255)) for _ in range(4)])
else:
rand_suffix = ''.join(random.choices(
string.ascii_letters + string.digits, k=10))
value = 'missingdyn' + rand_suffix
event[dyn_field.field] = value
final_events.append(event)
return final_events
def check_files(input: str, output: str) -> Tuple[str, str]:
input_path = Path(input)
output_path = Path(output)
if not input_path.exists():
print(f"Input file {input} does not exist.")
exit(1)
if not input_path.is_file():
print(f"Input file {input} is not a file.")
exit(1)
if output_path.exists():
print(f"Ovewriting output file {output}.")
else:
output_path.touch()
print(f"Created output file {output}.")
return input_path.absolute().as_posix(), output_path.absolute().as_posix()
def unflatten(events: List[dict]) -> Generator[dict, None, None]:
for event in events:
unflatted = dict()
for field, value in event.items():
parts = field.split('.')
current = unflatted
for part in parts[:-1]:
if part not in current:
current[part] = dict()
current = current[part]
current[parts[-1]] = value
yield unflatted
def main(input_raw: str, output_raw: str) -> None:
input_path, output_path = check_files(input_raw, output_raw)
templates = load_templates(input_path)
generators = [EventGenerator(template) for template in templates]
final_events = []
for generator in generators:
final_events.extend(generator.generate())
with open(output_path, 'w') as f:
for event in unflatten(final_events):
f.write(json.dumps(event) + '\n')
print('\n\nDone')
exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Generate events based on given configuration.")
parser.add_argument(
'input', type=str, help="Input file containing the configuration in YML format.")
parser.add_argument(
'output', type=str, help="Output file to store the generated events in JSON format.")
args = parser.parse_args()
main(args.input, args.output) |
Daily updateToday we improved the poc script without changing the running algorithm, and implemented the first version of the benchmarking report script. New PoC featuresAfter successful tests filling the index, the script was modified to generate output files with alert reports (json) and OpenSearch query metrics (CSV) among other minor changes. These modifications will allow performance benchmarking studies to be performed. PoC updated here#!python3
import yaml
import argparse
import signal
import sys
from typing import List, Optional, Mapping, Any, Union, Collection
from datetime import datetime, timezone
import time
from opensearchpy import OpenSearch, Transport, TransportError, ConnectionTimeout, ConnectionError # pip install opensearch-py
from prettytable import PrettyTable # pip install prettytable
import json
# OpenSearch connection settings
OS_HOST = "localhost"
OS_PORT = 9200
OS_USER = "admin"
OS_PASS = "wazuhEngine5+"
# Don't forget to change the index name, no patter support in where clause
INDEX = "wazuh-alerts-5.x-*"
TS_FIELD = "event.ingested"
ID_FIELD = "event.id"
DEBUG_LEVEL = 0
################################################
# Auxiliary functions
################################################
def get_field(event: dict, field: str, exit_on_error: bool = True):
'''
Get the value of a field in the event.
Args:
- event: dict: Event data
- field: str: Field name
- exit_on_error: bool: Exit the program if the field is not found
Returns:
- str: Value of the field in the event
'''
value = event
for key in field.split("."):
try:
value = value[key]
except KeyError:
if exit_on_error:
print(f"{bcolors.FAIL}Error: {field} not found in the event.{bcolors.ENDC}")
exit(1)
return None
return value
def get_list_id(events: List[dict]) -> List[int]:
'''
Get the list of the ids of the events.
Args:
- events: List[dict]: List of the events
Returns:
- List[int]: List of the ids of the events
'''
list = []
for event in events:
id = get_field(event, ID_FIELD, True)
# If id is not string or int, exit the program
if not isinstance(id, (str, int)):
print(f"{bcolors.FAIL}Error: {ID_FIELD} must be a string or an integer.{bcolors.ENDC}")
exit(1)
# if id is string convert it to int
if isinstance(id, str):
id = int(id)
list.append(id)
return list
################################################
# Metrics and Alerts
################################################
class QueryMetrics:
'''
Class that represents the metrics of a query.
'''
def __init__(self, query: str, bytes_sent: int, bytes_received: int, time_r: float) -> None:
self.query = query
self.bytes_sent = bytes_sent
self.bytes_received = bytes_received
self.time_r = time_r
################################################
# Opensearch and printers
################################################
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class OpenSearchConnector:
'''
Class that represents a connection to OpenSearch.
Attributes:
- opensearch: OpenSearch: OpenSearch object
'''
class CustomTransport(Transport):
def perform_request(
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = None,
body: Any = None,
timeout: Optional[Union[int, float]] = None,
ignore: Collection[int] = (),
headers: Optional[Mapping[str, str]] = None,
) -> Any:
"""
Perform the actual request. Retrieve a connection from the connection
pool, pass all the information to its perform_request method and
return the data.
If an exception was raised, mark the connection as failed and retry (up
to `max_retries` times).
If the operation was successful and the connection used was previously
marked as dead, mark it as live, resetting its failure count.
:arg method: HTTP method to use
:arg url: absolute url (without host) to target
:arg headers: dictionary of headers, will be handed over to the
underlying :class:`~opensearchpy.Connection` class
:arg params: dictionary of query parameters, will be handed over to the
underlying :class:`~opensearchpy.Connection` class for serialization
:arg body: body of the request, will be serialized using serializer and
passed to the connection
"""
method, params, body, ignore, timeout = self._resolve_request_args(
method, params, body
)
for attempt in range(self.max_retries + 1):
connection = self.get_connection()
try:
# Calculate bytes sent
bytes_sent = 0
if headers:
bytes_sent += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers.items())
if body:
if isinstance(body, str):
body_bytes = body.encode('utf-8')
elif isinstance(body, bytes):
body_bytes = body
else:
body_bytes = self.serializer.dumps(
body).encode('utf-8')
bytes_sent += len(body_bytes)
if params:
bytes_sent += len(url.encode('utf-8')) + len(
'&'.join(f"{k}={v}" for k, v in params.items()).encode('utf-8'))
else:
bytes_sent += len(url.encode('utf-8'))
# Measure time
start_time = time.time()
status, headers_response, data = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
# Measure time
end_time = time.time()
time_r = end_time - start_time
# Lowercase all the header names for consistency in accessing them.
headers_response = {
header.lower(): value for header, value in headers_response.items()
}
# Calculate bytes received
bytes_received = 0
if headers_response:
bytes_received += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers_response.items())
if data:
bytes_received += len(data)
except TransportError as e:
if method == "HEAD" and e.status_code == 404:
return False
retry = False
if isinstance(e, ConnectionTimeout):
retry = self.retry_on_timeout
elif isinstance(e, ConnectionError):
retry = True
elif e.status_code in self.retry_on_status:
retry = True
if retry:
try:
# only mark as dead if we are retrying
self.mark_dead(connection)
except TransportError:
# If sniffing on failure, it could fail too. Catch the
# exception not to interrupt the retries.
pass
# raise exception on last retry
if attempt == self.max_retries:
raise e
else:
raise e
else:
# connection didn't fail, confirm its live status
self.connection_pool.mark_live(connection)
if method == "HEAD":
return 200 <= status < 300
if data:
data = self.deserializer.loads(
data, headers_response.get("content-type")
)
# Return data and metrics
return data, bytes_sent, bytes_received, time_r
def __init__(self):
'''
Create an OpenSearchConnector connector.
The object connects to OpenSearch and check if the SQL plugin is enabled.
'''
self.query_metrics: List[QueryMetrics] = []
self.opensearch = OpenSearch(
[{"host": OS_HOST, "port": OS_PORT}],
http_auth=(OS_USER, OS_PASS),
http_compress=True,
use_ssl=True,
verify_certs=False,
timeout=30,
ssl_show_warn=False,
transport_class=self.CustomTransport
)
if not self.opensearch.ping():
# Show the error message
print(self.opensearch.info())
exit(1)
# Check if sql plugin is enabled
list_json_plugins, _, _, _ = self.opensearch.cat.plugins(
params={"s": "component", "v": "true", "format": "json"})
list_plugins = [plugin["component"] for plugin in list_json_plugins]
if "opensearch-sql" not in list_plugins:
print("The SQL plugin is not enabled.")
exit(1)
def log_query(self, query: QueryMetrics) -> None:
self.query_metrics.append(query)
def dump_metrics(self) -> str:
total_bytes_sent = sum(
[query.bytes_sent for query in self.query_metrics])
total_bytes_received = sum(
[query.bytes_received for query in self.query_metrics])
total_time = sum([query.time_r for query in self.query_metrics])
dump = f"Total queries: {len(self.query_metrics)}\nTotal bytes sent: {total_bytes_sent}\nTotal bytes received: {total_bytes_received}\nTotal time: {total_time} seconds\n"
mean_time = total_time / len(self.query_metrics)
mean_bytes_sent = total_bytes_sent / len(self.query_metrics)
mean_bytes_received = total_bytes_received / len(self.query_metrics)
dump += f"Mean time: {mean_time} seconds\nMean bytes sent: {mean_bytes_sent}\nMean bytes received: {mean_bytes_received}\n"
for i, query in enumerate(self.query_metrics):
dump += f"\nQuery {i + 1}\n{query.query}\nBytes sent: {query.bytes_sent}\nBytes received: {query.bytes_received}\nTime: {query.time_r} seconds\n"
return dump
def generate_event_table(keys, hits) -> str:
'''
Generate a table with the events, showing only the keys specified.
Args:
- keys: List[str]: List of the keys to show in the table
- hits: List[dict]: List of the events to show in the table
'''
# Create a PrettyTable object
table = PrettyTable()
# Set columns from the keys
# table.field_names = [k.split(".")[-1] for k in keys]
keys.append(ID_FIELD)
table.field_names = keys
# Add rows to the table
for entry in hits:
# Ensure each key is present in the dictionary to avoid KeyError
row = []
for k in keys:
it = entry
for subk in k.split("."):
try:
it = it[subk]
except KeyError:
it = "-"
break
row.append(it)
table.add_row(row)
# Set table style
table.border = True
table.horizontal_char = '-'
table.vertical_char = '|'
table.junction_char = '+'
# Return the table as a string
return table.get_string()
################################################
# Rule components
################################################
class Entry:
'''
Class that represents an event in the sequence.
Attributes:
- step: int: Step of the event in the sequence
- event: dict: Event data
- timestamp: int: Timestamp of the event
'''
def __init__(self, step: int, event: dict) -> None:
'''
Create an Entry object.
Args:
- step: int: Step of the event in the sequence
- event: dict: Event data
'''
self.step: int = step
self.event = event
# Get the timestamp of the event
field = get_field(event, TS_FIELD)
# If field is not string, convert it to string
if not isinstance(field, str):
field = str(field)
field = field.replace('Z', '+00:00')
date_obj = datetime.fromisoformat(field)
self.timestamp = int(date_obj.timestamp())
class Cache:
'''
Class that represents a cache for the events, acumulates the events of the same sequence.
Attributes:
- cache: dict: Cache of the events
'''
def __init__(self):
self.elements = []
def to_str(self, fields: List[str]) -> str:
'''
Cache to string
Args:
- fields: List[str]: List of the fields to print
'''
return generate_event_table(fields, [entry.event for entry in self.elements])
def __len__(self):
return len(self.elements)
class _sql_static_field:
'''
Class that represents a field filter in the query.
A field filter is a field that should be equal to a specific value.
Attributes:
- field: str: Field name
- value: str: Value to compare
'''
def __init__(self, field: str, value: str):
self.field = field
self.value = value
def get_query(self):
'''
Get the sql condition for the field filter.
Returns:
- str: SQL condition for the field filter I.E. (field = value)
'''
return f'{self.field} = "{self.value}"'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the field filter.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the field filter, False otherwise
'''
field = get_field(hit, self.field, False)
if field is None:
return False
# if field is a list, take the first element
if isinstance(field, list):
field = field[0]
return field == self.value
class _sql_dinamic_field:
'''
Class that represents a field filter in the query, used to same fields.
A field filter is a field that should be equal to a other field. The value is not specified.
Attributes:
- field: str: Field name
'''
def __init__(self, field: str):
self.field = field
def get_query(self) -> str:
'''
Get the sql query condition for the field filter.
Returns:
- str: SQL condition for the field exists I.E. (field IS NOT NULL)
'''
return f'{self.field} IS NOT NULL'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the field filter.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the field filter, False otherwise
'''
field = get_field(hit, self.field, False)
return field is not None
class Step:
"""
Represent a step in the sequence.
Attributes:
- filter: list[_sql_static_field]: Required fields with values to filter the events (Mandatory)
- frequency: int: Frequency of the step (default 1 hit)
- group_by_fields: list[_sql_dinamic_field]: Fields that should be equal to a other field (optional)
"""
def __init__(self, filter: List[_sql_static_field], frequency: int, group_by_fields: List[_sql_dinamic_field]):
self.filter: List[_sql_static_field] = filter
self.frequency: int = frequency
self.group_by_fields: List[_sql_dinamic_field] = group_by_fields
def get_query(self) -> str:
'''
Get the query for the step
The query is a string that represents the step to fetch the events from OpenSearch.
Is a combination of the filter fields in AND condition and the equal fields that should be exists.
Returns:
- str: SQL query for the step
'''
query = ' AND '.join([field.get_query() for field in self.filter + self.group_by_fields])
return f'({query})'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the step.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the step, False otherwise
'''
for field in self.filter:
if not field.evaluate(hit):
return False
for field in self.group_by_fields:
if not field.evaluate(hit):
return False
return True
class Rule:
"""
Rule class that contains the information of a rule.
Attributes:
- name: str - Name of the rule
- timeframe: int - Timeframe for the rule
- group_by_fields: list[str] - Fields that should be the same (optional)
- static_fields: dict[str, str] - Fields that should be equal to a specific value (optional)
- last_ingested: int - Last event fetched
- _sequence: list[Step] - Sequence of the rule
- _caches: dict - Cache of the events, each key is a hash of the same fields
"""
def __init__(self, name: str, timeframe: int, group_by_fields: 'list[str]', static_fields: 'dict[str, str]'):
'''
Create a Rule object.
Args:
- name: str: Name of the rule
- timeframe: int: Timeframe for the rule
- group_by_fields: list[str]: Fields that should be the same (optional)
- static_fields: dict[str, str]: Fields that should be equal to a specific value (optional)
'''
self.name: str = name
self.timeframe: int = timeframe
self.group_by_fields: List[_sql_dinamic_field] = [_sql_dinamic_field(field) for field in group_by_fields]
self.static_fields: List[_sql_static_field] = [_sql_static_field(
field, value) for field, value in static_fields.items()]
self._sequence: List[Step] = []
self.last_ingested: int = 0
self._caches: dict = {}
self._alerts: List[List[int]] = []
def add_step(self, step: Step):
'''
Add a step to the sequence of the rule.
Args:
- step: Step: Step to add to the sequence
'''
# Verify if the step has the same length of the equal fields of the previous step
if len(self._sequence) > 0 and len(step.group_by_fields) != len(self._sequence[-1].group_by_fields):
raise ValueError('Error: The step has not the same length of the equal fields of the previous step.')
self._sequence.append(step)
def _get_global_query_condition(self):
'''
Get the global query condition for the rule.
The global query condition is a combination of the same fields and static fields in AND condition,
all events should match this condition.
Returns:
- str: SQL query for the global condition
'''
query = ' AND '.join([field.get_query() for field in self.group_by_fields + self.static_fields])
# If there is no global query return an empty string
if query == '':
return ''
return f'({query})'
def _get_condition(self) -> str:
'''
Get the condition query for the rule
The query is a string that represents the rule to fetch the events from OpenSearch.
Is a combination of the global query AND the sequence queries in OR condition.
Returns:
- str: Query for the rule
'''
global_query = self._get_global_query_condition()
sequence_query = ' OR '.join(
[step.get_query() for step in self._sequence])
# If there is no global query return the sequence query
if global_query == '':
return sequence_query
return f'{global_query} AND ({sequence_query})'
def get_query(self) -> str:
'''
Get the query for the rule
The query is a string that represents the rule to fetch the events from OpenSearch.
'''
time_str = datetime.fromtimestamp(self.last_ingested, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')
query_str = f"SELECT * FROM {INDEX} AS idx WHERE {TS_FIELD} > '{time_str}' AND ({self._get_condition()}) ORDER BY {TS_FIELD} ASC;"
return query_str
def _list_fields(self) -> List[str]:
'''
List all unique fields interested in the rule.
Returns:
- List[str]: List of the fields of the rule
'''
fields: List[str] = [TS_FIELD]
for field in self.group_by_fields:
fields.append(field.field)
for field in self.static_fields:
fields.append(field.field)
for step in self._sequence:
for field in step.filter:
fields.append(field.field)
for field in step.group_by_fields:
fields.append(field.field)
# Remove duplicates
fields = list(dict.fromkeys(fields))
return fields
def _fetch_events(self, osc: OpenSearchConnector) -> List[dict]:
'''
Fetch the events from OpenSearch.
The function fetch the events sice the last ingested event.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.OKBLUE}Fetching events...{bcolors.ENDC}{bcolors.ENDC}')
response = None
query = self.get_query()
try:
response, bytes_sent, bytes_received, time_r = osc.opensearch.transport.perform_request(
url="/_plugins/_sql/",
method="POST",
params={"format": "json", "request_timeout": 30},
body={"query": query}
)
osc.log_query(QueryMetrics(query, bytes_sent, bytes_received, time_r))
except Exception as e:
print(f"Error: {e}")
exit(1)
# No se si esto esta bien
if 'error' in response:
print(f"Error: {response['error']['reason']}")
exit(1)
if 'hits' not in response or 'hits' not in response['hits'] or len(response['hits']['hits']) == 0:
return []
# Save the last time of the query
last_ingested_str = response['hits']['hits'][-1]['_source']['event']['ingested']
self.last_ingested = int(datetime.fromisoformat(
last_ingested_str.replace('Z', '+00:00')).timestamp())
if DEBUG_LEVEL > 0:
print(f"Last ingested: {last_ingested_str}")
if DEBUG_LEVEL > 1:
print(f"Query: {query}")
# Create a list of event of the response
hit_list = response['hits']['hits']
# Check the events
events: List[dict] = []
for hit in hit_list:
events.append(hit['_source'])
if DEBUG_LEVEL > 0:
print(f"Events fetched: {len(events)}")
if DEBUG_LEVEL > 1:
print(f'{bcolors.OKGREEN}{generate_event_table(self._list_fields(), events)}{bcolors.ENDC}')
return events
def _fill_cache(self, events: List[dict]):
'''
Fill the cache with the events.
The function insert the events in the cache.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.OKBLUE}Filling cache...{bcolors.ENDC}{bcolors.ENDC}')
# Check which stage an event belongs to
for event in events:
entry = None
# Check if the event match a Step
for i, step in enumerate(self._sequence):
if step.evaluate(event):
entry = Entry(i, event)
break
if entry is None:
print(f"{bcolors.FAIL}Error: Event does not match any step.{bcolors.ENDC}")
exit(1)
# Get the value of same fild and equal fields
rule_group_by_fields_values = [get_field(event, field.field) for field in self.group_by_fields]
group_by_fields_values = [get_field(event, field.field)
for field in self._sequence[entry.step].group_by_fields]
obj_hash = hash(tuple(rule_group_by_fields_values + group_by_fields_values))
str_hast = str(obj_hash)
if DEBUG_LEVEL > 2:
print(f"---- ANALYZING EVENT ----")
# print(f"Event: {event}")
print(f"Event ts: {get_field(event, TS_FIELD)}")
print(f"Step: {entry.step}")
print(f"Same fields: {rule_group_by_fields_values}")
print(f"Equal fields: {group_by_fields_values}")
print(f"Hash: {str_hast}")
# Search the cache
if str_hast in self._caches:
cache = self._caches[str_hast]
cache.elements.append(entry)
else:
cache = Cache()
cache.elements.append(entry)
self._caches[str_hast] = cache
# Print the cache
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== New cache state for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
for key, cache in self._caches.items():
print(f"Key: {key}")
print(f'{bcolors.OKGREEN}{cache.to_str(self._list_fields())}{bcolors.ENDC}')
class _match_result:
'''
Class that represents the result of the match of the cache with the sequence.
Attributes:
- dirty: bool: True if the cache was modified
- events: list[dict]: List of the events that match the sequence
'''
def __init__(self, cache_modified: bool, events: List[dict] = []):
self._events: List[dict] = events
self._cache_modified: bool = cache_modified
def __bool__(self) -> bool:
return self._cache_modified
def alert_events(self) -> List[dict]:
return self._events
def _match_sequence(self, cache: Cache) -> _match_result:
'''
Check if the cache match the sequence.
Args:
- cache: Cache: Cache to check
Returns:
- _match_result: Result of the match
'''
current: int = 0 # Current event index
step_index: int = 0 # Current step index
hit_counter: int = 0 # Current count of events that match the step condition
# List index of events that match the condition of the step
list_success_events: List[int] = []
# Iterate over the events in the cache
while current < len(cache.elements) and step_index < len(self._sequence):
# Check which step the event matches
event = cache.elements[current]
# If the event is from the next step, remove the event from the cache
if event.step > step_index:
cache.elements.pop(current)
continue
# If the event is from the previous step, skip it
if event.step < step_index:
current += 1
continue
# if is the same step, check if the event is out of the timeframe
if event.timestamp - cache.elements[0].timestamp > self.timeframe:
# Remove first element, and try all steps again
cache.elements.pop(0)
return self._match_result(True)
# The event meets the step condition
hit_counter += 1 # Increase the hit counter
# Add the event to the list of successful events
list_success_events.append(current)
current += 1 # Move to the next event
# Check if the step condition was met the required number of times
if hit_counter == self._sequence[step_index].frequency:
# Move to the next step
step_index += 1
hit_counter = 0
# Matched all the steps
if step_index == len(self._sequence):
# Extract the matched events from the cache
alert_event: List[dict] = []
for i in reversed(list_success_events):
alert_event.append(cache.elements.pop(i).event)
alert_event.reverse()
return self._match_result(True, alert_event)
return self._match_result(False)
def match(self) -> None:
'''
Match the events in the cache with the sequence.
The function check if the events in the cache and try to match the sequence.
- If the sequence is matched, the events are removed from the cache and stored in the _alerts attribute.
- If any event is unescessary, the event is removed from the cache.
'''
# Iterate over the caches
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Matching sequences for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
new_caches_state: bool = False
for key, cache in self._caches.items():
# Check if the cache matches the sequence and return the events
while result := self._match_sequence(cache):
new_caches_state = True
if len(result.alert_events()) > 0:
print(f'{bcolors.BOLD}{bcolors.UNDERLINE}{bcolors.OKCYAN}Matched sequence:{bcolors.ENDC}{bcolors.ENDC}{bcolors.ENDC}')
print(f'source cache key: {key}')
print(f'{bcolors.OKCYAN}{generate_event_table(self._list_fields(), result.alert_events())}{bcolors.ENDC}')
print()
self._alerts.append(get_list_id(result.alert_events()))
if DEBUG_LEVEL > 0 and new_caches_state:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== New cache state for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
for key, cache in self._caches.items():
print(f"Key: {key}")
print(f'{bcolors.OKGREEN}{cache.to_str(self._list_fields())}{bcolors.ENDC}')
def update(self, osc: OpenSearchConnector) -> bool:
'''
Fetch the events from OpenSearch, update cache and match the sequence.
The function fetch the events sice the last ingested event and update the cache with the new events.
Returns:
- False if no events were fetched, True otherwise
Args:
- osc: OpenSearchConnector: OpenSearchConnector object
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Updating rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
events = self._fetch_events(osc)
if len(events) == 0:
if DEBUG_LEVEL > 0:
print(f'{bcolors.WARNING}No events fetched.{bcolors.ENDC}')
return False
self._fill_cache(events)
self.match()
return True
################################################
# Dumpers
################################################
def dump_alerts_as_json(rules: List[Rule]) -> str:
'''
Dump the alerts as a JSON string.
Dump alerts with the format:
[
{
"rule_id": 1,
"rule_name": "Rule 1",
"num_alerts": 2,
"alerts": [ [1, 2, 3], [4, 5, 6] ]
},
{
"rule_id": 2,
"rule_name": "Rule 2",
"num_alerts": 2,
"alerts": [ [1, 2, 3], [4, 5, 6] ]
}
]
Args:
- rules: List[Rule]: List of the rules
Returns:
- str: JSON string with the alerts
'''
alerts = []
for i, rule in enumerate(rules):
alerts.append({
"rule_id": i,
"rule_name": rule.name,
"num_alerts": len(rule._alerts),
"alerts": rule._alerts
})
# Dump the alerts as a JSON string with '"' as separators
return json.dumps(alerts, separators=(',', ':'))
def dump_query_metrics_as_csv(list_metrics: List[QueryMetrics]) -> str:
'''
Dump the query metrics as csv string.
Dump the query metrics with the format:
id,bytes_sent,bytes_received,time_r
1,100,200,0.1
Args:
- list_metrics: List[QueryMetrics]: List of the query metrics
Returns:
- str: CSV string with the query metrics
'''
# Create the CSV string
csv_str = 'id,bytes_sent,bytes_received,time_r\n'
for i, metric in enumerate(list_metrics):
csv_str += f'{i},{metric.bytes_sent},{metric.bytes_received},{metric.time_r}\n'
return csv_str
################################################
# Parsers
################################################
def parse_yaml(file_path: str) -> dict:
'''
Parse a YAML file and return the data as a dictionary.
'''
with open(file_path, 'r') as file:
data = yaml.safe_load(file)
return data
def parse_step(sequence: dict) -> Step:
# Filter events to fetch on OpenSearch (Mandatory)
static_fields_list: list[_sql_static_field] = []
frequency: int = 1 # Frequency of the step (default 1 hit)
# Fields that should be equal (optional)
group_by_fields: list[_sql_dinamic_field] = []
# Get the filter (if exists should be a map of field and value strings)
try:
raw_check = sequence['check']
# Check if the filter is a list of object with key and value strings
if not isinstance(raw_check, list) or not all(isinstance(field, dict) for field in raw_check):
raise ValueError('Error: filter must be an array of object with key and value strings.')
# iterate over the filter (raw_check) adding them to the static_fields_list
for field in raw_check:
for field_name, field_value in field.items():
static_fields_list.append(_sql_static_field(field_name, field_value))
except KeyError:
raise ValueError('Error: filter not found in the step.')
# Get the frecuency (if exists should be a positive integer)
try:
frequency = sequence['frequency']
# Check if the frequency is an integer and positive
if not isinstance(frequency, int) or frequency < 1:
raise ValueError('Error: frequency must be a positive integer.')
except KeyError:
pass
# Get the equal fields (if exists should be a list of strings)
try:
raw_group_by = sequence['group_by']
# Check if the raw_group_by is a list of strings
if not isinstance(raw_group_by, list) or not all(isinstance(field, str) for field in raw_group_by):
raise ValueError('Error: group_by_fields must be a list of strings.')
# Create the group_by_fields
group_by_fields = [_sql_dinamic_field(field) for field in raw_group_by]
except KeyError:
pass
return Step(static_fields_list, frequency, group_by_fields)
def parse_rule(rule: dict) -> Rule:
timeframe: int = 0 # Timeframe for the rule
group_by_fields: list[str] = [] # Fields that should be the same (optional)
# Fields that should be equal to a specific value (optional)
static_fields: dict[str, str] = {}
name: str = "No name"
# Get the name of the rule
try:
name = rule['name']
except KeyError:
pass
# Get the timeframe
try:
timeframe = rule['timeframe']
# Check if the timeframe is an integer and positive
if not isinstance(timeframe, int) or timeframe < 1:
raise ValueError('Error: timeframe must be a positive integer.')
except KeyError:
raise ValueError('Error: timeframe not found in the rule.')
# Get the same fields (if exists should be a list of strings)
try:
group_by_fields = rule['group_by']
# Check if the group_by_fields is a list of strings
if not isinstance(group_by_fields, list) or not all(isinstance(field, str) for field in group_by_fields):
raise ValueError('Error: group_by_fields must be a list of strings.')
except KeyError:
pass
# Get the static fields (if exists should be a list of strings)
try:
raw_static_fields = rule['check']
# Check if the static_fields is a list of object with key and value strings
if not isinstance(raw_static_fields, list) or not all(isinstance(field, dict) for field in raw_static_fields):
raise ValueError('Error: static_fields must be a list of object with key and value strings.')
# iterate over the static fields (raw_static_fields) adding them to the static_fields
for field in raw_static_fields:
for field_name, field_value in field.items():
static_fields[field_name] = field_value
except KeyError:
pass
# Check if the sequence is a list of steps
try:
sequence = rule['sequence']
# Check if the sequence is a list of steps
if not isinstance(sequence, list) or not all(isinstance(step, dict) for step in sequence):
raise ValueError('Error: sequence must be a list of steps.')
except KeyError:
raise ValueError('Error: sequence not found in the rule.')
# Create the rule
r = Rule(name, timeframe, group_by_fields, static_fields)
# Parse the steps
for step in sequence:
s = parse_step(step)
r.add_step(s)
return r
################################################
def main(args):
# Load the rule definition from the YAML file
yaml_file = args.i
# Set fast mode, end if no event is fetched, no sleep
fast_mode: bool = False
if args.f:
fast_mode = args.f
# Truncate the output files
if args.a:
with open(args.a, "w") as f:
f.write("")
if args.s:
with open(args.s, "w") as f:
f.write("")
osc = OpenSearchConnector()
rule_definitions = parse_yaml(yaml_file)
rule_list: List[Rule] = []
# Check if rule_definition is a list of rules (Array of objects)
if not isinstance(rule_definitions, list) or not all(isinstance(rule_definition, dict)
for rule_definition in rule_definitions):
raise ValueError('Error: rule_definition must be an array of objects.')
for rule_definition in rule_definitions:
rule = parse_rule(rule_definition)
rule_list.append(rule)
try:
while True:
event_fetched : bool = False
for rule in rule_list:
event_fetched = rule.update(osc) or event_fetched
if not fast_mode:
time.sleep(1)
elif not event_fetched:
raise KeyboardInterrupt # LOL
except KeyboardInterrupt:
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Alerts ===============*{bcolors.ENDC}{bcolors.ENDC}')
print (dump_alerts_as_json(rule_list))
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Qeury Stat ===============*{bcolors.ENDC}{bcolors.ENDC}')
print(dump_query_metrics_as_csv(osc.query_metrics))
#print(osc.dump_metrics())
# Save the alerts
if args.a:
print(f"{bcolors.OKGREEN}Saving alerts to {args.a}...{bcolors.ENDC}")
with open(args.a, "a") as f:
f.write(dump_alerts_as_json(rule_list))
# Save the query metrics
if args.s:
print(f"{bcolors.OKGREEN}Saving query metrics to {args.s}...{bcolors.ENDC}")
with open(args.s, "a") as f:
f.write(dump_query_metrics_as_csv(osc.query_metrics))
print(f"{bcolors.WARNING}Exiting...{bcolors.ENDC}")
if __name__ == "__main__":
# Rule input
parser = argparse.ArgumentParser(description='Load a sequence rule.')
parser.add_argument(
'-i', type=str, help='Path to the input YAML file.', required=True)
# Optional alert output
parser.add_argument('-a', type=str, help='Path to the output alert file.')
# Optional stats output
parser.add_argument('-s', type=str, help='Path to the output stats file.')
# Optional end if no event is fetched
parser.add_argument('-f', type=bool, help='Fast mode, end if no event is fetched. No sleep.')
args = parser.parse_args()
main(args) Report analysisd - Alpha versionWe worked on the first script that will be in charge of reading the outputs of the PoC executions and generate reports that will be useful for comparisons between them. Report generator here#!/usr/bin/env python
import pandas as pd
import matplotlib.pyplot as plt # pip install matplotlib
import argparse
import os
def analyze_csv(file_path):
# Open CSV file
df = pd.read_csv(file_path)
# Metrics
total_time = df['time_r'].sum()
total_bytes_received = df['bytes_received'].sum()
query_count = df['id'].count()
avg_bytes_per_query = total_bytes_received / query_count
avg_bytes_sent_per_query = df['bytes_sent'].mean()
metrics = {
"total_time": total_time,
"total_bytes_received": total_bytes_received,
"query_count": query_count,
"avg_bytes_per_query": avg_bytes_per_query,
"avg_bytes_sent_per_query": avg_bytes_sent_per_query
}
# Plotting
plt.figure(figsize=(10, 6))
# Recived bytes vs Time
plt.scatter(df['bytes_received'], df['time_r'])
plt.title('Recived Bytes vs Time')
plt.xlabel('Recived Bytes')
plt.ylabel('Response Time (s)')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_bytes_vs_time.png')
plt.close()
# Histogram of response time
plt.hist(df['time_r'], bins=20)
plt.title('Histogram of Response Time')
plt.xlabel('Response Time (s)')
plt.ylabel('Frequency')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_time_histogram.png')
plt.close()
# Bytes sent vs Bytes received
plt.scatter(df['bytes_sent'], df['bytes_received'])
plt.title('Bytes Sent vs Bytes Received')
plt.xlabel('Sent Bytes')
plt.ylabel('Received Bytes')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_sent_vs_received.png')
plt.close()
# Bytes received per query
plt.bar(df['id'], df['bytes_received'])
plt.title('Bytes Received per Query')
plt.xlabel('Query ID')
plt.ylabel('Received Bytes')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_bytes_per_query.png')
plt.close()
# Return metrics as json string with " as quotes
return str(metrics).replace("'", "\"")
def main(input_files):
all_metrics = {}
for file_path in input_files:
metrics = analyze_csv(file_path)
all_metrics[file_path] = metrics
print(f'Metrics for {file_path}: {metrics}')
# TODO Add comparison between files
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Analyze and compare CSV files from PoC tests.')
parser.add_argument('input_files', nargs='+', help='List of CSV files to analyze')
args = parser.parse_args()
main(args.input_files) Example outputs {
"total_time": 1.44719505310058,
"total_bytes_received": 12900122,
"query_count": 123,
"avg_bytes_per_query": 104879.0406504065,
"avg_bytes_sent_per_query": 395.0
} |
Daily updateBrief
PoCThe timestamp issue A problem related to event fetching has been discovered. It is not a serious problem, When fetching events, a limit (default 200) of how many events to fetch is set. Then for There are several solutions, such as hashing the event and looking for equal or greater timestamps, discarding saved hashes and others to be investigated, such as using cursors. It can be seen in an intensive test, when the poc has to fetch for each rule a large number of events and exceeds this limit. We doubt that this fetching load is the general behavior, since in periods of time it should only fetch new events that may become part of the sequence. Testing example over 30000 events (10000 per rule) Limit 200 "rule_name":"Potential DNS Tunneling via NsLookup","num_alerts":468
"rule_name":"Potential Command and Control via Internet Explorer","num_alerts":262,
"rule_name":"Multiple Logon Failure Followed by Logon Success","num_alerts":377, Limit 2000 "rule_name":"Potential DNS Tunneling via NsLookup","num_alerts":470,
"rule_name":"Potential Command and Control via Internet Explorer","num_alerts":290,
"rule_name":"Multiple Logon Failure Followed by Logon Success","num_alerts":380, Limit 10000 "rule_name":"Potential DNS Tunneling via NsLookup","num_alerts":470
"rule_name":"Potential Command and Control via Internet Explorer","num_alerts":290,
"rule_name":"Multiple Logon Failure Followed by Logon Success","num_alerts":380, Performance with change of limits The PoC was modified to ask the API for a certain number of events per query, resulting in Report generator here#!/usr/bin/env python3
import yaml
import argparse
import signal
import sys
from typing import List, Optional, Mapping, Any, Union, Collection
from datetime import datetime, timezone
import time
from opensearchpy import OpenSearch, Transport, TransportError, ConnectionTimeout, ConnectionError # pip install opensearch-py
from prettytable import PrettyTable # pip install prettytable
import json
import os
# OpenSearch connection settings
OS_HOST = "localhost"
OS_PORT = 9200
OS_USER = "admin"
OS_PASS = "wazuhEngine5+"
# Don't forget to change the index name, no patter support in where clause
INDEX = "wazuh-alerts-5.x-*"
TS_FIELD = "event.ingested"
ID_FIELD = "event.id"
DEBUG_LEVEL = 0
LIMIT = "10000"
################################################
# Auxiliary functions
################################################
def get_field(event: dict, field: str, exit_on_error: bool = True):
'''
Get the value of a field in the event.
Args:
- event: dict: Event data
- field: str: Field name
- exit_on_error: bool: Exit the program if the field is not found
Returns:
- str: Value of the field in the event
'''
value = event
for key in field.split("."):
try:
value = value[key]
except KeyError:
if exit_on_error:
print(f"{bcolors.FAIL}Error: {field} not found in the event.{bcolors.ENDC}")
exit(1)
return None
return value
def get_list_id(events: List[dict]) -> List[int]:
'''
Get the list of the ids of the events.
Args:
- events: List[dict]: List of the events
Returns:
- List[int]: List of the ids of the events
'''
list = []
for event in events:
id = get_field(event, ID_FIELD, True)
# If id is not string or int, exit the program
if not isinstance(id, (str, int)):
print(f"{bcolors.FAIL}Error: {ID_FIELD} must be a string or an integer.{bcolors.ENDC}")
exit(1)
# if id is string convert it to int
if isinstance(id, str):
id = int(id)
list.append(id)
return list
################################################
# Metrics and Alerts
################################################
class QueryMetrics:
'''
Class that represents the metrics of a query.
'''
def __init__(self, query: str, bytes_sent: int, bytes_received: int, time_r: float) -> None:
self.query = query
self.bytes_sent = bytes_sent
self.bytes_received = bytes_received
self.time_r = time_r
################################################
# Opensearch and printers
################################################
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class OpenSearchConnector:
'''
Class that represents a connection to OpenSearch.
Attributes:
- opensearch: OpenSearch: OpenSearch object
'''
class CustomTransport(Transport):
def perform_request(
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = None,
body: Any = None,
timeout: Optional[Union[int, float]] = None,
ignore: Collection[int] = (),
headers: Optional[Mapping[str, str]] = None,
) -> Any:
"""
Perform the actual request. Retrieve a connection from the connection
pool, pass all the information to its perform_request method and
return the data.
If an exception was raised, mark the connection as failed and retry (up
to `max_retries` times).
If the operation was successful and the connection used was previously
marked as dead, mark it as live, resetting its failure count.
:arg method: HTTP method to use
:arg url: absolute url (without host) to target
:arg headers: dictionary of headers, will be handed over to the
underlying :class:`~opensearchpy.Connection` class
:arg params: dictionary of query parameters, will be handed over to the
underlying :class:`~opensearchpy.Connection` class for serialization
:arg body: body of the request, will be serialized using serializer and
passed to the connection
"""
method, params, body, ignore, timeout = self._resolve_request_args(
method, params, body
)
for attempt in range(self.max_retries + 1):
connection = self.get_connection()
try:
# Calculate bytes sent
bytes_sent = 0
if headers:
bytes_sent += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers.items())
if body:
if isinstance(body, str):
body_bytes = body.encode('utf-8')
elif isinstance(body, bytes):
body_bytes = body
else:
body_bytes = self.serializer.dumps(
body).encode('utf-8')
bytes_sent += len(body_bytes)
if params:
bytes_sent += len(url.encode('utf-8')) + len(
'&'.join(f"{k}={v}" for k, v in params.items()).encode('utf-8'))
else:
bytes_sent += len(url.encode('utf-8'))
# Measure time
start_time = time.time()
status, headers_response, data = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
# Measure time
end_time = time.time()
time_r = end_time - start_time
# Lowercase all the header names for consistency in accessing them.
headers_response = {
header.lower(): value for header, value in headers_response.items()
}
# Calculate bytes received
bytes_received = 0
if headers_response:
bytes_received += sum(len(k.encode('utf-8')) + len(v.encode('utf-8'))
for k, v in headers_response.items())
if data:
bytes_received += len(data)
except TransportError as e:
if method == "HEAD" and e.status_code == 404:
return False
retry = False
if isinstance(e, ConnectionTimeout):
retry = self.retry_on_timeout
elif isinstance(e, ConnectionError):
retry = True
elif e.status_code in self.retry_on_status:
retry = True
if retry:
try:
# only mark as dead if we are retrying
self.mark_dead(connection)
except TransportError:
# If sniffing on failure, it could fail too. Catch the
# exception not to interrupt the retries.
pass
# raise exception on last retry
if attempt == self.max_retries:
raise e
else:
raise e
else:
# connection didn't fail, confirm its live status
self.connection_pool.mark_live(connection)
if method == "HEAD":
return 200 <= status < 300
if data:
data = self.deserializer.loads(
data, headers_response.get("content-type")
)
# Return data and metrics
return data, bytes_sent, bytes_received, time_r
def __init__(self, limit: int = 200):
'''
Create an OpenSearchConnector connector.
The object connects to OpenSearch and check if the SQL plugin is enabled.
'''
self.limit: int = limit
self.query_metrics: List[QueryMetrics] = []
self.opensearch = OpenSearch(
[{"host": OS_HOST, "port": OS_PORT}],
http_auth=(OS_USER, OS_PASS),
http_compress=True,
use_ssl=True,
verify_certs=False,
timeout=30,
ssl_show_warn=False,
transport_class=self.CustomTransport
)
if not self.opensearch.ping():
# Show the error message
print(self.opensearch.info())
exit(1)
# Check if sql plugin is enabled
list_json_plugins, _, _, _ = self.opensearch.cat.plugins(
params={"s": "component", "v": "true", "format": "json"})
list_plugins = [plugin["component"] for plugin in list_json_plugins]
if "opensearch-sql" not in list_plugins:
print("The SQL plugin is not enabled.")
exit(1)
def log_query(self, query: QueryMetrics) -> None:
self.query_metrics.append(query)
def dump_metrics(self) -> str:
total_bytes_sent = sum(
[query.bytes_sent for query in self.query_metrics])
total_bytes_received = sum(
[query.bytes_received for query in self.query_metrics])
total_time = sum([query.time_r for query in self.query_metrics])
dump = f"Total queries: {len(self.query_metrics)}\nTotal bytes sent: {total_bytes_sent}\nTotal bytes received: {total_bytes_received}\nTotal time: {total_time} seconds\n"
mean_time = total_time / len(self.query_metrics)
mean_bytes_sent = total_bytes_sent / len(self.query_metrics)
mean_bytes_received = total_bytes_received / len(self.query_metrics)
dump += f"Mean time: {mean_time} seconds\nMean bytes sent: {mean_bytes_sent}\nMean bytes received: {mean_bytes_received}\n"
for i, query in enumerate(self.query_metrics):
dump += f"\nQuery {i + 1}\n{query.query}\nBytes sent: {query.bytes_sent}\nBytes received: {query.bytes_received}\nTime: {query.time_r} seconds\n"
return dump
def generate_event_table(keys, hits) -> str:
'''
Generate a table with the events, showing only the keys specified.
Args:
- keys: List[str]: List of the keys to show in the table
- hits: List[dict]: List of the events to show in the table
'''
# Create a PrettyTable object
table = PrettyTable()
# Set columns from the keys
# table.field_names = [k.split(".")[-1] for k in keys]
keys.append(ID_FIELD)
table.field_names = keys
# Add rows to the table
for entry in hits:
# Ensure each key is present in the dictionary to avoid KeyError
row = []
for k in keys:
it = entry
for subk in k.split("."):
try:
it = it[subk]
except KeyError:
it = "-"
break
row.append(it)
table.add_row(row)
# Set table style
table.border = True
table.horizontal_char = '-'
table.vertical_char = '|'
table.junction_char = '+'
# Return the table as a string
return table.get_string()
################################################
# Rule components
################################################
class Entry:
'''
Class that represents an event in the sequence.
Attributes:
- step: int: Step of the event in the sequence
- event: dict: Event data
- timestamp: int: Timestamp of the event
'''
def __init__(self, step: int, event: dict) -> None:
'''
Create an Entry object.
Args:
- step: int: Step of the event in the sequence
- event: dict: Event data
'''
self.step: int = step
self.event = event
# Get the timestamp of the event
field = get_field(event, TS_FIELD)
# If field is not string, convert it to string
if not isinstance(field, str):
field = str(field)
field = field.replace('Z', '+00:00')
date_obj = datetime.fromisoformat(field)
self.timestamp = int(date_obj.timestamp())
class Cache:
'''
Class that represents a cache for the events, acumulates the events of the same sequence.
Attributes:
- cache: dict: Cache of the events
'''
def __init__(self):
self.elements = []
def to_str(self, fields: List[str]) -> str:
'''
Cache to string
Args:
- fields: List[str]: List of the fields to print
'''
return generate_event_table(fields, [entry.event for entry in self.elements])
def __len__(self):
return len(self.elements)
class _sql_static_field:
'''
Class that represents a field filter in the query.
A field filter is a field that should be equal to a specific value.
Attributes:
- field: str: Field name
- value: str: Value to compare
'''
def __init__(self, field: str, value: str):
self.field = field
self.value = value
def get_query(self):
'''
Get the sql condition for the field filter.
Returns:
- str: SQL condition for the field filter I.E. (field = value)
'''
return f'{self.field} = "{self.value}"'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the field filter.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the field filter, False otherwise
'''
field = get_field(hit, self.field, False)
if field is None:
return False
# if field is a list, take the first element
if isinstance(field, list):
field = field[0]
return field == self.value
class _sql_dinamic_field:
'''
Class that represents a field filter in the query, used to same fields.
A field filter is a field that should be equal to a other field. The value is not specified.
Attributes:
- field: str: Field name
'''
def __init__(self, field: str):
self.field = field
def get_query(self) -> str:
'''
Get the sql query condition for the field filter.
Returns:
- str: SQL condition for the field exists I.E. (field IS NOT NULL)
'''
return f'{self.field} IS NOT NULL'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the field filter.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the field filter, False otherwise
'''
field = get_field(hit, self.field, False)
return field is not None
class Step:
"""
Represent a step in the sequence.
Attributes:
- filter: list[_sql_static_field]: Required fields with values to filter the events (Mandatory)
- frequency: int: Frequency of the step (default 1 hit)
- group_by_fields: list[_sql_dinamic_field]: Fields that should be equal to a other field (optional)
"""
def __init__(self, filter: List[_sql_static_field], frequency: int, group_by_fields: List[_sql_dinamic_field]):
self.filter: List[_sql_static_field] = filter
self.frequency: int = frequency
self.group_by_fields: List[_sql_dinamic_field] = group_by_fields
def get_query(self) -> str:
'''
Get the query for the step
The query is a string that represents the step to fetch the events from OpenSearch.
Is a combination of the filter fields in AND condition and the equal fields that should be exists.
Returns:
- str: SQL query for the step
'''
query = ' AND '.join([field.get_query() for field in self.filter + self.group_by_fields])
return f'({query})'
def evaluate(self, hit: dict) -> bool:
'''
Evaluate if the hit event match the step.
Args:
- hit: dict: Hit event from OpenSearch to evaluate
Returns:
- bool: True if the event match the step, False otherwise
'''
for field in self.filter:
if not field.evaluate(hit):
return False
for field in self.group_by_fields:
if not field.evaluate(hit):
return False
return True
class Rule:
"""
Rule class that contains the information of a rule.
Attributes:
- name: str - Name of the rule
- timeframe: int - Timeframe for the rule
- group_by_fields: list[str] - Fields that should be the same (optional)
- static_fields: dict[str, str] - Fields that should be equal to a specific value (optional)
- last_ingested: int - Last event fetched
- _sequence: list[Step] - Sequence of the rule
- _caches: dict - Cache of the events, each key is a hash of the same fields
"""
def __init__(self, name: str, timeframe: int, group_by_fields: 'list[str]', static_fields: 'dict[str, str]'):
'''
Create a Rule object.
Args:
- name: str: Name of the rule
- timeframe: int: Timeframe for the rule
- group_by_fields: list[str]: Fields that should be the same (optional)
- static_fields: dict[str, str]: Fields that should be equal to a specific value (optional)
'''
self.name: str = name
self.timeframe: int = timeframe
self.group_by_fields: List[_sql_dinamic_field] = [_sql_dinamic_field(field) for field in group_by_fields]
self.static_fields: List[_sql_static_field] = [_sql_static_field(
field, value) for field, value in static_fields.items()]
self._sequence: List[Step] = []
self.last_ingested: int = 0
self._caches: dict = {}
self._alerts: List[List[int]] = []
def add_step(self, step: Step):
'''
Add a step to the sequence of the rule.
Args:
- step: Step: Step to add to the sequence
'''
# Verify if the step has the same length of the equal fields of the previous step
if len(self._sequence) > 0 and len(step.group_by_fields) != len(self._sequence[-1].group_by_fields):
raise ValueError('Error: The step has not the same length of the equal fields of the previous step.')
self._sequence.append(step)
def _get_global_query_condition(self):
'''
Get the global query condition for the rule.
The global query condition is a combination of the same fields and static fields in AND condition,
all events should match this condition.
Returns:
- str: SQL query for the global condition
'''
query = ' AND '.join([field.get_query() for field in self.group_by_fields + self.static_fields])
# If there is no global query return an empty string
if query == '':
return ''
return f'({query})'
def _get_condition(self) -> str:
'''
Get the condition query for the rule
The query is a string that represents the rule to fetch the events from OpenSearch.
Is a combination of the global query AND the sequence queries in OR condition.
Returns:
- str: Query for the rule
'''
global_query = self._get_global_query_condition()
sequence_query = ' OR '.join(
[step.get_query() for step in self._sequence])
# If there is no global query return the sequence query
if global_query == '':
return sequence_query
return f'{global_query} AND ({sequence_query})'
def get_query(self) -> str:
'''
Get the query for the rule
The query is a string that represents the rule to fetch the events from OpenSearch.
'''
time_str = datetime.fromtimestamp(self.last_ingested, timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')
query_str = f"SELECT * FROM {INDEX} AS idx WHERE {TS_FIELD} > '{time_str}' AND ({self._get_condition()}) ORDER BY {TS_FIELD} ASC"
return query_str
def _list_fields(self) -> List[str]:
'''
List all unique fields interested in the rule.
Returns:
- List[str]: List of the fields of the rule
'''
fields: List[str] = [TS_FIELD]
for field in self.group_by_fields:
fields.append(field.field)
for field in self.static_fields:
fields.append(field.field)
for step in self._sequence:
for field in step.filter:
fields.append(field.field)
for field in step.group_by_fields:
fields.append(field.field)
# Remove duplicates
fields = list(dict.fromkeys(fields))
return fields
def _fetch_events(self, osc: OpenSearchConnector) -> List[dict]:
'''
Fetch the events from OpenSearch.
The function fetch the events sice the last ingested event.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.OKBLUE}Fetching events...{bcolors.ENDC}{bcolors.ENDC}')
response = None
query = self.get_query() + f" LIMIT {osc.limit};"
try:
response, bytes_sent, bytes_received, time_r = osc.opensearch.transport.perform_request(
url="/_plugins/_sql/",
method="POST",
params={"format": "json", "request_timeout": 30},
body={"query": query}
)
osc.log_query(QueryMetrics(query, bytes_sent, bytes_received, time_r))
except Exception as e:
print(f"Error: {e}")
exit(1)
# No se si esto esta bien
if 'error' in response:
print(f"Error: {response['error']['reason']}")
exit(1)
if 'hits' not in response or 'hits' not in response['hits'] or len(response['hits']['hits']) == 0:
return []
# Save the last time of the query
last_ingested_str = response['hits']['hits'][-1]['_source']['event']['ingested']
self.last_ingested = int(datetime.fromisoformat(
last_ingested_str.replace('Z', '+00:00')).timestamp())
if DEBUG_LEVEL > 0:
print(f"Last ingested: {last_ingested_str}")
if DEBUG_LEVEL > 1:
print(f"Query: {query}")
# Create a list of event of the response
hit_list = response['hits']['hits']
# Check the events
events: List[dict] = []
for hit in hit_list:
events.append(hit['_source'])
if DEBUG_LEVEL > 0:
print(f"Events fetched: {len(events)}")
if DEBUG_LEVEL > 1:
print(f'{bcolors.OKGREEN}{generate_event_table(self._list_fields(), events)}{bcolors.ENDC}')
return events
def _fill_cache(self, events: List[dict]):
'''
Fill the cache with the events.
The function insert the events in the cache.
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.OKBLUE}Filling cache...{bcolors.ENDC}{bcolors.ENDC}')
# Check which stage an event belongs to
for event in events:
entry = None
# Check if the event match a Step
for i, step in enumerate(self._sequence):
if step.evaluate(event):
entry = Entry(i, event)
break
if entry is None:
print(f"{bcolors.FAIL}Error: Event does not match any step.{bcolors.ENDC}")
exit(1)
# Get the value of same fild and equal fields
rule_group_by_fields_values = [get_field(event, field.field) for field in self.group_by_fields]
group_by_fields_values = [get_field(event, field.field)
for field in self._sequence[entry.step].group_by_fields]
obj_hash = hash(tuple(rule_group_by_fields_values + group_by_fields_values))
str_hast = str(obj_hash)
if DEBUG_LEVEL > 2:
print(f"---- ANALYZING EVENT ----")
# print(f"Event: {event}")
print(f"Event ts: {get_field(event, TS_FIELD)}")
print(f"Step: {entry.step}")
print(f"Same fields: {rule_group_by_fields_values}")
print(f"Equal fields: {group_by_fields_values}")
print(f"Hash: {str_hast}")
# Search the cache
if str_hast in self._caches:
cache = self._caches[str_hast]
cache.elements.append(entry)
else:
cache = Cache()
cache.elements.append(entry)
self._caches[str_hast] = cache
# Print the cache
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== New cache state for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
for key, cache in self._caches.items():
print(f"Key: {key}")
print(f'{bcolors.OKGREEN}{cache.to_str(self._list_fields())}{bcolors.ENDC}')
class _match_result:
'''
Class that represents the result of the match of the cache with the sequence.
Attributes:
- dirty: bool: True if the cache was modified
- events: list[dict]: List of the events that match the sequence
'''
def __init__(self, cache_modified: bool, events: List[dict] = []):
self._events: List[dict] = events
self._cache_modified: bool = cache_modified
def __bool__(self) -> bool:
return self._cache_modified
def alert_events(self) -> List[dict]:
return self._events
def _match_sequence(self, cache: Cache) -> _match_result:
'''
Check if the cache match the sequence.
Args:
- cache: Cache: Cache to check
Returns:
- _match_result: Result of the match
'''
current: int = 0 # Current event index
step_index: int = 0 # Current step index
hit_counter: int = 0 # Current count of events that match the step condition
# List index of events that match the condition of the step
list_success_events: List[int] = []
# Iterate over the events in the cache
while current < len(cache.elements) and step_index < len(self._sequence):
# Check which step the event matches
event = cache.elements[current]
# If the event is from the next step, remove the event from the cache
if event.step > step_index:
cache.elements.pop(current)
continue
# If the event is from the previous step, skip it
if event.step < step_index:
current += 1
continue
# if is the same step, check if the event is out of the timeframe
if event.timestamp - cache.elements[0].timestamp > self.timeframe:
# Remove first element, and try all steps again
cache.elements.pop(0)
return self._match_result(True)
# The event meets the step condition
hit_counter += 1 # Increase the hit counter
# Add the event to the list of successful events
list_success_events.append(current)
current += 1 # Move to the next event
# Check if the step condition was met the required number of times
if hit_counter == self._sequence[step_index].frequency:
# Move to the next step
step_index += 1
hit_counter = 0
# Matched all the steps
if step_index == len(self._sequence):
# Extract the matched events from the cache
alert_event: List[dict] = []
for i in reversed(list_success_events):
alert_event.append(cache.elements.pop(i).event)
alert_event.reverse()
return self._match_result(True, alert_event)
return self._match_result(False)
def match(self) -> None:
'''
Match the events in the cache with the sequence.
The function check if the events in the cache and try to match the sequence.
- If the sequence is matched, the events are removed from the cache and stored in the _alerts attribute.
- If any event is unescessary, the event is removed from the cache.
'''
# Iterate over the caches
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Matching sequences for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
new_caches_state: bool = False
for key, cache in self._caches.items():
# Check if the cache matches the sequence and return the events
while result := self._match_sequence(cache):
new_caches_state = True
if len(result.alert_events()) > 0:
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.UNDERLINE}{bcolors.OKCYAN}Matched sequence:{bcolors.ENDC}{bcolors.ENDC}{bcolors.ENDC}')
print(f'source cache key: {key}')
print(f'{bcolors.OKCYAN}{generate_event_table(self._list_fields(), result.alert_events())}{bcolors.ENDC}')
print()
self._alerts.append(get_list_id(result.alert_events()))
if DEBUG_LEVEL > 0 and new_caches_state:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== New cache state for rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
for key, cache in self._caches.items():
print(f"Key: {key}")
print(f'{bcolors.OKGREEN}{cache.to_str(self._list_fields())}{bcolors.ENDC}')
def update(self, osc: OpenSearchConnector) -> bool:
'''
Fetch the events from OpenSearch, update cache and match the sequence.
The function fetch the events sice the last ingested event and update the cache with the new events.
Returns:
- False if no events were fetched, True otherwise
Args:
- osc: OpenSearchConnector: OpenSearchConnector object
'''
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Updating rule: {self.name} ===============*{bcolors.ENDC}{bcolors.ENDC}')
events = self._fetch_events(osc)
if len(events) == 0:
if DEBUG_LEVEL > 0:
print(f'{bcolors.WARNING}No events fetched.{bcolors.ENDC}')
return False
self._fill_cache(events)
self.match()
return True
################################################
# Dumpers
################################################
def dump_alerts_as_json(rules: List[Rule]) -> str:
'''
Dump the alerts as a JSON string.
Dump alerts with the format:
[
{
"rule_id": 1,
"rule_name": "Rule 1",
"num_alerts": 2,
"alerts": [ [1, 2, 3], [4, 5, 6] ]
},
{
"rule_id": 2,
"rule_name": "Rule 2",
"num_alerts": 2,
"alerts": [ [1, 2, 3], [4, 5, 6] ]
}
]
Args:
- rules: List[Rule]: List of the rules
Returns:
- str: JSON string with the alerts
'''
alerts = []
for i, rule in enumerate(rules):
alerts.append({
"rule_id": i,
"rule_name": rule.name,
"num_alerts": len(rule._alerts),
"alerts": rule._alerts
})
# Dump the alerts as a pretty JSON string with '"' as separators
return json.dumps(alerts, indent=4, separators=(',', ': '))
def dump_query_metrics_as_csv(list_metrics: List[QueryMetrics]) -> str:
'''
Dump the query metrics as csv string.
Dump the query metrics with the format:
id,bytes_sent,bytes_received,time_r
1,100,200,0.1
Args:
- list_metrics: List[QueryMetrics]: List of the query metrics
Returns:
- str: CSV string with the query metrics
'''
# Create the CSV string
csv_str = 'id,bytes_sent,bytes_received,time_r\n'
for i, metric in enumerate(list_metrics):
csv_str += f'{i},{metric.bytes_sent},{metric.bytes_received},{metric.time_r}\n'
return csv_str
################################################
# Parsers
################################################
def parse_yaml(file_path: str) -> dict:
'''
Parse a YAML file and return the data as a dictionary.
'''
with open(file_path, 'r') as file:
data = yaml.safe_load(file)
return data
def parse_step(sequence: dict) -> Step:
# Filter events to fetch on OpenSearch (Mandatory)
static_fields_list: list[_sql_static_field] = []
frequency: int = 1 # Frequency of the step (default 1 hit)
# Fields that should be equal (optional)
group_by_fields: list[_sql_dinamic_field] = []
# Get the filter (if exists should be a map of field and value strings)
try:
raw_check = sequence['check']
# Check if the filter is a list of object with key and value strings
if not isinstance(raw_check, list) or not all(isinstance(field, dict) for field in raw_check):
raise ValueError('Error: filter must be an array of object with key and value strings.')
# iterate over the filter (raw_check) adding them to the static_fields_list
for field in raw_check:
for field_name, field_value in field.items():
static_fields_list.append(_sql_static_field(field_name, field_value))
except KeyError:
raise ValueError('Error: filter not found in the step.')
# Get the frecuency (if exists should be a positive integer)
try:
frequency = sequence['frequency']
# Check if the frequency is an integer and positive
if not isinstance(frequency, int) or frequency < 1:
raise ValueError('Error: frequency must be a positive integer.')
except KeyError:
pass
# Get the equal fields (if exists should be a list of strings)
try:
raw_group_by = sequence['group_by']
# Check if the raw_group_by is a list of strings
if not isinstance(raw_group_by, list) or not all(isinstance(field, str) for field in raw_group_by):
raise ValueError('Error: group_by_fields must be a list of strings.')
# Create the group_by_fields
group_by_fields = [_sql_dinamic_field(field) for field in raw_group_by]
except KeyError:
pass
return Step(static_fields_list, frequency, group_by_fields)
def parse_rule(rule: dict) -> Rule:
timeframe: int = 0 # Timeframe for the rule
group_by_fields: list[str] = [] # Fields that should be the same (optional)
# Fields that should be equal to a specific value (optional)
static_fields: dict[str, str] = {}
name: str = "No name"
# Get the name of the rule
try:
name = rule['name']
except KeyError:
pass
# Get the timeframe
try:
timeframe = rule['timeframe']
# Check if the timeframe is an integer and positive
if not isinstance(timeframe, int) or timeframe < 1:
raise ValueError('Error: timeframe must be a positive integer.')
except KeyError:
raise ValueError('Error: timeframe not found in the rule.')
# Get the same fields (if exists should be a list of strings)
try:
group_by_fields = rule['group_by']
# Check if the group_by_fields is a list of strings
if not isinstance(group_by_fields, list) or not all(isinstance(field, str) for field in group_by_fields):
raise ValueError('Error: group_by_fields must be a list of strings.')
except KeyError:
pass
# Get the static fields (if exists should be a list of strings)
try:
raw_static_fields = rule['check']
# Check if the static_fields is a list of object with key and value strings
if not isinstance(raw_static_fields, list) or not all(isinstance(field, dict) for field in raw_static_fields):
raise ValueError('Error: static_fields must be a list of object with key and value strings.')
# iterate over the static fields (raw_static_fields) adding them to the static_fields
for field in raw_static_fields:
for field_name, field_value in field.items():
static_fields[field_name] = field_value
except KeyError:
pass
# Check if the sequence is a list of steps
try:
sequence = rule['sequence']
# Check if the sequence is a list of steps
if not isinstance(sequence, list) or not all(isinstance(step, dict) for step in sequence):
raise ValueError('Error: sequence must be a list of steps.')
except KeyError:
raise ValueError('Error: sequence not found in the rule.')
# Create the rule
r = Rule(name, timeframe, group_by_fields, static_fields)
# Parse the steps
for step in sequence:
s = parse_step(step)
r.add_step(s)
return r
################################################
def main(args):
# Load the rule definition from the YAML file
yaml_file = args.i
# Set fast mode, end if no event is fetched, no sleep
fast_mode: bool = False
if args.f:
fast_mode = args.f
# Truncate the output files, if they exist
json_file = None
csv_file = None
if args.p:
base_path = os.path.dirname(args.i) if args.d == '.' else args.d
# create if not exists
if not os.path.exists(base_path):
os.makedirs(base_path)
json_file = f'{base_path}/{args.p}alerts.json'
csv_file = f'{base_path}/{args.p}metrics.csv'
with open(json_file, "w") as f:
f.write("")
with open(csv_file, "w") as f:
f.write("")
# Create the OpenSearchConnector object
osc = OpenSearchConnector(args.l)
rule_definitions = parse_yaml(yaml_file)
rule_list: List[Rule] = []
# Check if rule_definition is a list of rules (Array of objects)
if not isinstance(rule_definitions, list) or not all(isinstance(rule_definition, dict)
for rule_definition in rule_definitions):
raise ValueError('Error: rule_definition must be an array of objects.')
for rule_definition in rule_definitions:
rule = parse_rule(rule_definition)
rule_list.append(rule)
try:
while True:
event_fetched : bool = False
for rule in rule_list:
event_fetched = rule.update(osc) or event_fetched
if not fast_mode:
time.sleep(1)
elif not event_fetched:
raise KeyboardInterrupt # LOL
except KeyboardInterrupt:
if DEBUG_LEVEL > 0:
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Alerts ===============*{bcolors.ENDC}{bcolors.ENDC}')
print (dump_alerts_as_json(rule_list))
print(f'{bcolors.BOLD}{bcolors.HEADER}*=============== Qeury Stat ===============*{bcolors.ENDC}{bcolors.ENDC}')
print(dump_query_metrics_as_csv(osc.query_metrics))
#print(osc.dump_metrics())
# Save the alerts
if json_file and csv_file:
print(f"{bcolors.OKGREEN}Saving alerts to {json_file} ...{bcolors.ENDC}")
with open(json_file, "a") as f:
f.write(dump_alerts_as_json(rule_list))
print(f"{bcolors.OKGREEN}Saving query metrics to {csv_file} ...{bcolors.ENDC}")
with open(csv_file, "a") as f:
f.write(dump_query_metrics_as_csv(osc.query_metrics))
print(f"{bcolors.WARNING}Exiting...{bcolors.ENDC}")
if __name__ == "__main__":
# Rule input
parser = argparse.ArgumentParser(description='Load a sequence rule.')
parser.add_argument(
'-i', type=str, help='Path to the input YAML file.', required=True)
# Optional output files
parser.add_argument('-p', type=str, help='Prefix for the output files. If not specified, no output files are generated.')
# Dir output
parser.add_argument('-d', type=str, help='Path to the output directory if -p is specified.', default='.')
# Optional end if no event is fetched
parser.add_argument('-f', type=bool, help='Fast mode, end if no event is fetched. No sleep.')
# Optional limit of events
parser.add_argument('-l', type=int, help='Limit of events to fetch.', default=200)
args = parser.parse_args()
main(args) Analysis scriptMany improvements have been made to the PoC output analyzer, among them it now allows the comparison of metrics, although it still needs to be improved, especially the plotting part. PoC updated here#!/usr/bin/env python3
import pandas as pd
import matplotlib.pyplot as plt
import argparse
import os
def analyze_csv(file_path):
# Read the CSV file
df = pd.read_csv(file_path)
# Metrics
total_time = df['time_r'].sum()
total_bytes_received = df['bytes_received'].sum()
query_count = df['id'].count()
avg_bytes_per_query = total_bytes_received / query_count
avg_bytes_sent_per_query = df['bytes_sent'].mean()
metrics = {
"total_time": total_time,
"total_bytes_received": total_bytes_received,
"query_count": query_count,
"avg_bytes_per_query": avg_bytes_per_query,
"avg_bytes_sent_per_query": avg_bytes_sent_per_query
}
# Plots
plt.figure(figsize=(10, 6))
# Bytes received vs response time
plt.scatter(df['bytes_received'], df['time_r'])
plt.title('Bytes Received vs Response Time')
plt.xlabel('Bytes Received')
plt.ylabel('Response Time (s)')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_bytes_vs_time.png')
plt.close()
# Response time histogram
plt.hist(df['time_r'], bins=20)
plt.title('Response Time Histogram')
plt.xlabel('Response Time (s)')
plt.ylabel('Frequency')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_time_histogram.png')
plt.close()
# Bytes sent vs bytes received
plt.scatter(df['bytes_sent'], df['bytes_received'])
plt.title('Bytes Sent vs Bytes Received')
plt.xlabel('Bytes Sent')
plt.ylabel('Bytes Received')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_sent_vs_received.png')
plt.close()
# Bytes received per query
plt.bar(df['id'], df['bytes_received'])
plt.title('Bytes Received per Query')
plt.xlabel('Query ID')
plt.ylabel('Bytes Received')
plt.grid(True)
plt.savefig(f'{os.path.splitext(file_path)[0]}_bytes_per_query.png')
plt.close()
return metrics
def compare_metrics(metrics_list, output_dir):
combined_metrics = {
"total_time": [],
"total_bytes_received": [],
"query_count": [],
"avg_bytes_per_query": [],
"avg_bytes_sent_per_query": []
}
# Add trailing slash to output_dir if not present
if output_dir[-1] != '/':
output_dir += '/'
for metrics in metrics_list:
for key in combined_metrics:
combined_metrics[key].append(metrics[key])
# Create comparison plots
plt.figure(figsize=(10, 6))
# Total time comparison
plt.bar(range(len(metrics_list)), combined_metrics["total_time"])
plt.title('Total Time Comparison')
plt.xlabel('CSV File Index')
plt.ylabel('Total Time (s)')
plt.savefig(output_dir + 'comparison_total_time.png')
plt.close()
# Total bytes received comparison
plt.bar(range(len(metrics_list)), combined_metrics["total_bytes_received"])
plt.title('Total Bytes Received Comparison')
plt.xlabel('CSV File Index')
plt.ylabel('Total Bytes Received')
plt.savefig(output_dir + 'comparison_total_bytes_received.png')
plt.close()
# Average bytes per query comparison
plt.bar(range(len(metrics_list)), combined_metrics["avg_bytes_per_query"])
plt.title('Average Bytes per Query Comparison')
plt.xlabel('CSV File Index')
plt.ylabel('Average Bytes per Query')
plt.savefig(output_dir + 'comparison_avg_bytes_per_query.png')
plt.close()
# Average bytes sent per query comparison
plt.bar(range(len(metrics_list)), combined_metrics["avg_bytes_sent_per_query"])
plt.title('Average Bytes Sent per Query Comparison')
plt.xlabel('CSV File Index')
plt.ylabel('Average Bytes Sent per Query')
plt.savefig(output_dir + 'comparison_avg_bytes_sent_per_query.png')
plt.close()
def main(input_files):
all_metrics = []
for file_path in input_files:
metrics = analyze_csv(file_path)
all_metrics.append(metrics)
print(f'Metrics for {file_path}: {metrics}')
# Save metrics to file
with open(f'{os.path.splitext(file_path)[0]}_metrics.json', 'w') as f:
f.write(str(metrics).replace("'", "\""))
if len(input_files) > 1:
# get dir of output files
output_dir = os.path.dirname(os.path.abspath(input_files[0]))
compare_metrics(all_metrics, output_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Analyze and compare CSV files from PoC tests.')
parser.add_argument('input_files', nargs='+', help='List of CSV files to analyze')
args = parser.parse_args()
main(args.input_files)
Benchmark of limitA script was created that allows to automate the script the call to the previous scripts, using different limits to generate the data, the bash script is attached. Script here#!/bin/bash
# Change the current directory to the script's directory
OLD_DIR=$(pwd)
cd "$(dirname "$0")"
# Define global constants for the directory and script names
OUTPUT_DIR="output"
POC_SCRIPT="./PoC_cache.py"
ANALYZE_SCRIPT="./analyze_csv.py"
# Check if the correct number of arguments was provided
if [ "$#" -ne 2 ]; then
echo "Error: Incorrect number of arguments."
echo "Usage: $0 <initial value> <maximum number of steps>"
exit 1
fi
# Assign arguments to descriptive variables
initial_value=$1
max_steps=$2
# Initialize the number with the provided initial value
number=$initial_value
# Inicialize empty array to store the CSV files
csv_files=()
# Iterate the given number of steps
for (( i=0; i<max_steps; i++ )); do
# If number is more than 10000, set it to 10000 and break the loop
if [ $number -gt 10000 ]; then
number=10000
fi
# Execute the PoC script with the current as limit
$POC_SCRIPT -i rule_test.yaml -d $OUTPUT_DIR -p "${number}_" -l "$number" -f True
# Add ${OUTPUT_DIR}/${number}_metrics.csv to the csv_files array
csv_files+=("${OUTPUT_DIR}/${number}_metrics.csv")
# Double the number for the next step
number=$(( number * 2 ))
if [ $number -eq 10000 ]; then
break
fi
done
echo "Processing complete, now analyzing CSV files in $OUTPUT_DIR..."
# Check if exists all CSV files
for csv_file in "${csv_files[@]}"; do
if [ ! -f $csv_file ]; then
echo "Error: CSV file $csv_file not found."
exit 1
fi
done
# Convert the array of CSV files to a string
csv_files=$(IFS=' '; echo "${csv_files[*]}")
# Execute another Python script to analyze all collected CSV files
$ANALYZE_SCRIPT $csv_files Some resultsThe script was executed with the following parameters: ./benchmark.sh 50 9 This means that it will run the same test with limits of 50, 100, 200, 400, 400, 800, I attach all the results, but this comparison is the most interesting. You can see how the total response time decreases as the limit increases: Next steps
|
Update
|
Daily update
|
UpdateSolved timestamp issues, there was an error in the algorithm plus the indexer returns UTC format with Europe/Madrid time. Fixed the error and communicating in UTC format with Madrid timestamp since is the PoC. In production environment a proper locale and time must be in place. Working on improving the analyze script, added box plot view for response time, and changed time to ms. Some QoL using high level libraries and typing. |
Daily update
ResultsRunning the new
the files inside each folder have the prefix XX_ to order them correctly in the graphs. Files 00_ to 08_ correspond to poc 1, where after the prefix the limit for event fetching is set. File 09_poc2 corresponds to poc 2 and does not need this tuning since it does not need to fetch large amounts of events. I will skip the analysis of each individual test and focus specifically on the comparisons. Potential DNS Tunneling via NsLookupDefinition of the rule: - name: Potential DNS Tunneling via NsLookup
check:
- event.category: process
- host.os.type: windows
group_by:
- host.id
timeframe: 300
sequence:
- frequency: 10
check:
- event.type: start
- process.name: nslookup.exe Avgerage bytes per query.This rule does not make comparisons, but counts events by grouping them by host.i. Avgerage bytes sent per query.It is expected that PoC 1 queries always send the same query but with different timestamp, and that is why it is constant, while in PoC 2 this changes because it adds more conditions to the queries. Total bytes received.This is interesting, on the one hand it can be seen that the total bytes of PoC 1 are kept constant, since it always seeks to fetch all events. This small curve that seems to be seen is due to the timestsamp bug, which always fetches more than the last event read, and it gives the explained here On the other hand, it can be seen that in the other approach, PoC 2, only brings the possible events of the sequence, so there will be events that it does not bring and this is the reason for its lower value. Total time used on queries.Finally, it can be seen that performing many queries is expensive, since in PoC 1, the total time is directly proportional to the number of queries. On the other hand it can be seen that it is more efficient to fetch all possible events and filter locally. Potential Command and Control via Internet ExplorerDefinition of the rule: - name: Potential Command and Control via Internet Explorer
check:
- host.os.type: windows
group_by:
- host.id
- user.name
timeframe: 5
sequence:
- check:
- event.category: library
- dll.name: IEProxy.dll
frequency: 1
- check:
- event.category: process
- event.type: start
- process.parent.name : iexplore.exe
- process.parent.args : -Embedding
frequency: 1
- check:
- event.category: network
- network.protocol: dns
- process.name: iexplore.exe
frequency: 5
Avgerage bytes per query.It can be seen how in PoC1 the updates of the 3 new types of events are received, while in PoC 2 a large number of requests are made looking for particular events Avgerage bytes sent per query.Here you can see how the search for particular events translates into longer and more complex searches at the sent byte level. Total bytes received.Same explanation as for rule 1, you can see the updates and the search for possible events. Total time used on queries.This is where you see the great cost of performing a large number of complex queries, you can see how the time of PoC 2 is much higher than any other approach. Potential Command and Control via Internet ExplorerDefinition of the rule: - name: Multiple Logon Failure Followed by Logon Success
check:
- event.category: authentication
group_by:
- source.ip
- user.name
timeframe: 5
sequence:
- check:
- event.action: logon-failed
frequency: 5
- check:
- event.action: logon-success
frequency: 1
The explanation is the same as for the previous rule, the behavior is to be expected after having seen rule 2. |
UpdateWorking on using opensearch-performance-analyzer plugin for the benchmarks. I was unable to start the plugin, tried to start it directly too but there are missing files. Pending to sync with indexer-team. Re-run the benchmarks to verify the data is consistent with what is already exposed. It is. |
Daily UpdateCreated scripts for data collection from the Wazuh-Indexer process. A script was also created to process the collected metrics. I attach everything here. We have conducted benchmarking for both PoCs, using the psutils library to measure the CPU and memory usage of Wazuh-Indexer. Samples were taken every 0.01 seconds. A brief analysis of both PoCs is provided, focusing only on the most relevant results. Results for Both PoCsIn this test, PoC 1 was run with event fetching limits set at 50, 100, 200, 400, 800, 1600, 3200, 6400, and 10000 events, followed by PoC 2. CPUCPU usage shows peaks in PoC 1, and the more events requested in the fetching limits, the lower the CPU usage. On the other hand, PoC 2 has higher and more continuous CPU usage. This is consistent with previous tests from a client perspective. MemoryWhile memory usage seems higher in the second PoC, we cannot be certain since memory not being freed may be due to the garbage collector policy and cache behavior of Wazuh-Indexer. Disk Usage & File DescriptorsDisk usage is higher in the second PoC, as is the number of open file descriptors. Although this could be due to the sampling time potentially being faster than a query, the behavior of the growth in accumulated reads and writes in PoC 2 can be observed. PoC 1 Vs. PoC 2In isolated tests, where the indexer was restarted before each test and only the 10000 limit was used for the first PoC, it was observed that PoC 1 has lower CPU usage than PoC 2. This is consistent with previous tests from a client perspective. |
Based on the collected data, we have concluded the investigation. Testing the Opensearch Performance Analyzer plugin in Wazuh was not feasible at this stage. We will consider this data when we resume the rule correlation work in phase 2, as outlined in this issue: #23577 |
Description
Building on the insights from issue #23332, this issue aims to develop a Proof of Concept (PoC) that leverages the OpenSearch SQL plugin not for direct correlation rule execution but for fetching relevant data into a local cache. This data will then be processed using a custom-built algorithm designed to perform correlation analysis. The objective is to validate the effectiveness of using SQL for data extraction and to assess the performance and accuracy of the custom correlation algorithm.
Objective
Requirements
Tasks
Testing Criteria
Notes
This PoC is crucial for exploring innovative ways to enhance threat detection and response capabilities within the Wazuh ecosystem. By efficiently fetching data and processing it through a custom algorithm, we aim to significantly improve the responsiveness and accuracy of our security solutions.
The text was updated successfully, but these errors were encountered: