In this article, we explore how Kafka Streams can be utilized for filtering and correlating events in real time, effectively transforming Kafka into a high-speed correlation engine. By leveraging the capabilities of ksqlDB, you can deploy content rules and filter alerts directly within Kafka. This approach enables real-time filtration and aggregation of log event flows using an intuitive SQL-like language.
Environment Setup
- Test Kafka Cluster: A single-node Kafka cluster operating in KRaft mode (Kafka Raft Metadata mode). KRaft replaces the traditional ZooKeeper setup, simplifying Kafka’s architecture and improving operational efficiency.
- Kafka-UI: A graphical user interface for managing and monitoring Kafka clusters. It allows easy visualization and manipulation of topics, schemas, and consumer groups.
- ksqlDB Server: This component enables real-time stream processing with SQL-like queries. It connects to the Kafka cluster, enabling filtering, transformation, and aggregation of streaming data.
- ksqlDB CLI: A command-line interface for interacting with the ksqlDB server. It allows users to write and execute SQL-like queries, define streams, and interact with data programmatically.
Our TEST environment:
Test Environment Overview
Step-by-Step Implementation
1. Connect to the ksqlDB CLI
Use the following command to connect to the ksqlDB CLI:
podman exec -it ksqldb-cli ksql http://ksqldb-server:8088
2. List Current Topics
To view all topics in your Kafka cluster, use:
3. Examine the Topic Schema
Inspect the wineventlogs topic to understand its data structure:
PRINT wineventlogs;
You can also use Kafka-UI to view the messages stored in the topic.
Defining a Stream Schema
To process data in ksqlDB, you must first define a stream schema that matches the JSON structure of your data. Below is the CREATE STREAM statement for the wineventlogs topic:
CREATE STREAM WINLOGEVENTS (
`@timestamp` VARCHAR,
`@metadata` STRUCT<
beat VARCHAR,
type VARCHAR,
version VARCHAR
>,
winlog STRUCT<
version INT,
event_data STRUCT<
ProcessId VARCHAR,
CommandLine VARCHAR,
SubjectDomainName VARCHAR,
MandatoryLabel VARCHAR,
SubjectUserName VARCHAR,
SubjectLogonId VARCHAR,
TargetUserSid VARCHAR,
SubjectUserSid VARCHAR,
TargetUserName VARCHAR,
ParentProcessName VARCHAR,
TargetLogonId VARCHAR,
NewProcessName VARCHAR,
TargetDomainName VARCHAR,
TokenElevationType VARCHAR,
NewProcessId VARCHAR
>,
api VARCHAR,
task VARCHAR,
process STRUCT<
pid INT,
thread STRUCT<
id INT
>
>,
keywords ARRAY<VARCHAR>,
provider_guid VARCHAR,
opcode VARCHAR,
record_id BIGINT,
computer_name VARCHAR,
event_id VARCHAR,
provider_name VARCHAR,
channel VARCHAR
>,
event STRUCT<
created VARCHAR,
code VARCHAR,
kind VARCHAR,
provider VARCHAR,
outcome VARCHAR,
action VARCHAR
>,
log STRUCT<
level VARCHAR
>,
message VARCHAR,
host STRUCT<
name VARCHAR,
hostname VARCHAR,
architecture VARCHAR,
os STRUCT<
version VARCHAR,
family VARCHAR,
name VARCHAR,
kernel VARCHAR,
build VARCHAR,
type VARCHAR,
platform VARCHAR
>,
id VARCHAR,
ip ARRAY<VARCHAR>,
mac ARRAY<VARCHAR>
>,
ecs STRUCT<
version VARCHAR
>,
agent STRUCT<
name VARCHAR,
type VARCHAR,
version VARCHAR,
ephemeral_id VARCHAR,
id VARCHAR
>
) WITH (
KAFKA_TOPIC = 'wineventlogs',
VALUE_FORMAT = 'JSON'
);
So, stream successfully created:
Lets test our WINLOGEVENTS stream, filter the stream as new data arrives:
So we see, that our stream works properly and we see all throughput messages.
Creating a Filtered Stream
Create a New Kafka Topic
First, create a topic for filtered messages windows_alerts:
Filter events using the SOC Prime TDM rule Possible Antivirus or Firewall Software Enumeration (via process_creation). Below is the CREATE STREAM statement:
- CommandLine according to data will be mapped to winlog->event_data->CommandLine
- ParentImage according to data will be mapped to winlog->event_data->ParentProcessName
- Host to host->hostname
CREATE STREAM WINDOWS_ALERTS WITH (KAFKA_TOPIC = 'windows_alerts') AS SELECT host->hostname AS Host, winlog->event_data->CommandLine AS CommandLine, winlog->event_data->ParentProcessName AS ParentImage FROM WINLOGEVENTS WHERE ( (winlog->event_data->CommandLine LIKE '%AntiSpywareProduct%' OR winlog->event_data->CommandLine LIKE '%AntiVirusProduct%' OR winlog->event_data->CommandLine LIKE '%FirewallProduct%') AND NOT (winlog->event_data->ParentProcessName LIKE '%phpstorm64.exe' OR winlog->event_data->ParentProcessName LIKE '%pycharm64.exe') AND winlog->event_data->CommandLine LIKE '%displayName%' AND winlog->event_data->CommandLine LIKE '%productState%') OR ( winlog->event_data->CommandLine LIKE '%Win32_Process%' AND winlog->event_data->CommandLine LIKE '%AvastUI.exe%' AND winlog->event_data->CommandLine LIKE '%AvastSvc.exe%' AND winlog->event_data->CommandLine LIKE '%FortiWF.exe%' AND winlog->event_data->CommandLine LIKE '%xagt.exe%' AND winlog->event_data->CommandLine LIKE '%fcappdb.exe%' );
So we defined the stream with filtering by our rule, lets test it!
Testing the Filtered Stream
Produce a Test Message
Publish a test message containing “fcappdb.exe” in the CommandLine field to the wineventlogs topic.
Check the Filtered Stream
Inspect the WINDOWS_ALERTS stream:
SELECT * FROM WINDOWS_ALERTS EMIT CHANGES;
And we see than Alert is here, cool!
And check the topic- we see the result correlated message here:
Thats it!
Conclusion
Using Kafka Streams with ksqlDB, you can build a high-performance correlation engine (like in SIEM). This approach allows for real-time event filtering, reducing reliance on traditional SIEM solutions. Additionally, by integrating content from SOC Prime TDM, you can enrich your correlation logic to detect sophisticated threats efficiently.
The post Using Kafka as a Fast Correlation Engine appeared first on SOC Prime.