Threat Hunting with Jupyter Notebooks — Part 3: Querying Elasticsearch via Apache Spark ✨

Roberto Rodriguez
Posts By SpecterOps Team Members
8 min readMay 30, 2019

--

In the previous post, I introduced the concept of using DataFrames to represent and analyze security event logs in a tabular format, and showed you how to do it so with the help of a python library named Pandas.

In this post, I will show you how to consume security event logs directly from an Elasticsearch database, save them to a DataFrame and perform a few queries via the Apache Spark Python APIs and SparkSQL module.

This post is part of a five-part series. The other four parts can be found in the following links:

Requirements

  • This post assumes that you read the previous one, deployed a HELK server and understand the basics of data processing via Python DataFrames.

Let’s go over some of the Apache Spark concepts that will be used in this post.

What is Apache Spark?

Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters

Unified Analytics?

It supports a wide range of data analytics tasks over the same computing engine and with a consistent set of APIs. For example, Spark can combine the following APIs into one scan over the data:

  • Simple data loading
  • SQL queries
  • Machine learning
  • Streaming computation

Computing Engine?

  • Spark handles loading data from storage systems and performing computation on it (not permanent storage).
  • It takes a lot of resources to move data around. Therefore, Spark focuses on performing computations over the data, no matter where it resides.

Set of Libraries?

Spark supports the following libraries :

Spark Basic Architecture

One of the challenges about data analysis is data processing and it is usually related to available computational resources. Spark introduces the concept of parallel data processing with additional capabilities to coordinate and schedule available resources and tasks execution across cluster workers.

Driver Process

  • Also called the Spark Session runs your main() function.
  • Manages information about the Spark Application.
  • Handles responses to users code or input.
  • Analyzes, distributes and schedules work across executors.

Executors

  • Execute tasks assigned by the driver process.
  • Executes and reports the state of the task computation back to the driver .

Cluster Manager

  • Keeps track of the available resources

How Do I run Spark Code?

  • One can run code via Spark Language APIs such as Scala, Python, etc.
  • Concepts represented via the Language APIs get translated into Spark code and run on the cluster of machines.

Spark APIs

Even though you can interact with distributed data and run code via Spark Language APIs, there are two main Spark APIs that make all this possible:

  • Low-level unstructured APIs
  • Higher-level structured APIs.

Structured API: Spark Dataframe

  • A DataFrame is the most common Structured API and simply organizes data into named columns and rows, like a table in a relational database.
  • You can think of a DataFrame as a spreadsheet with named columns.
  • A Python DataFrame sits on one computer in one specific location, whereas a Spark DataFrame can exists on multiple machines in a distributed way.
Single Machine Analysis — Python Dataframe
  • If you had a DataFrame bigger than your local hard drive in size, you wont be able to handle it with one machine.
Dataframe bigger than local endpoint
  • The Spark DataFrame concept allows you to perform parallel distributed analysis by breaking up the DataFrame in chunks across several servers.

What is Apache SparkSQL?

  • It is a Spark module that leverages Sparks functional programming APIs to allow SQL relational structured data processing.
  • It provides a programming abstraction called DataFrames. When running SparkSQL from within another programming language the results are returned as Spark DataFrames. In this post, we will use the basics of Pyspark to interact with DataFrames via the Spark SQL module.

What is PySpark?

  • PySpark is the Python API for Spark.
  • The DataFrame API is available in Scala, Java, Python, and R.
  • We can create a Jupyter Kernel to leverage the PySpark APIs and interact with the Spark cluster via a notebook. HELK already provides one.

Now that you understand the basics of Apache Spark, Spark DataFrames and the Spark Language APIs such as PySpark, we can start reading some data and performing a few queries.

I need a Data Source!

As mentioned before, Spark focuses on performing computations over the data, no matter where it resides. Therefore, it is important to identify where the data resides and how it can be accessed by Spark.

I currently store security events logs collected over the network in an Elasticsearch (ES) database as part of the HELK project. Therefore, I needed to figure out a way to be able to query my ES database via SparkSQL.

Enter ES-Hadoop

Luckily, the integration between ES and a Spark Cluster is done via the Elastic ES-Hadoop library.

  • An open-source, stand-alone, self-contained, small library that allows Hadoop jobs to interact with Elasticsearch.
  • It allows data to flow bi-directionally so that applications can leverage transparently the Elasticsearch engine capabilities

Prepare Elasticsearch

Let’s prepare our environment with real data inside of Elasticsearch.

Check HELK (ELK containers)

  • Browse to your HELK’s IP address and enter user helk with password hunting.
  • Click on the stack monitoring setting as shown in the image below
  • Check if your cluster is running and has all the elastic containers reporting

Download Kafkacat

  • If you are using a debian-based system, make sure you install the latest Kafkacat deb package.
  • I recommend Ubuntu 18.04. You can check its Kafkacat deb package version and compare it with the latest one in the Kafkacat GitHub repo.
  • You can also install it from source following the Quick Build instructions.
  • Once it is installed, perform a quick connection test via the -L metadata parameter and point to the HELK Kafka topic winlogbeat. That topic should have been created automatically by HELK when it was built.
kafkacat -b 192.168.64.138:9092 -t winlogbeat -LMetadata for winlogbeat (from broker 1: 192.168.64.138:9092/1):
1 brokers:
broker 1 at 192.168.64.138:9092
1 topics:
topic "winlogbeat" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1

Consume Mordor Dataset

Instead of standing up a whole detection lab or several windows computers on the fly, we are going to use the same Mordor dataset empire_invoke_wmi that we used in the previous post and send it to our HELK stack via Kafkacat 🌶

kafkacat -b 192.168.64.138:9092 -t winlogbeat -P -l empire_invoke_wmi_2019-05-18214442.json

Refresh Kibana Discover View

  • The Mordor dataset was recorded on May 18th, 2019 18Hrs
  • Make sure you have the right time window selected to validate that the data made it to the Elasticsearch database.

Read from Elasticsearch via Apache Spark

We are ready to start using the ES-Hadoop library to allow Spark to read, analyze and represent data from Elasticsearch via its structured DataFrame APIs and SQL module.

Create a New Notebook

  • Make sure you select the PySpark kernel.

Import SparkSession

  • We start by importing the class SparkSession from the PySpark SQL module.
  • The SparkSession is the main entry point for DataFrame and SQL functionality. A SparkSession can be used create a DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and even read parquet files.

Create a SparkSession

  • In order to create a SparkSession, we use the builder class
  • We give our Spark application a name and also set the Spark Master to our helk-spark-master container. All that is already taken care of by HELK. That means that we are going to be using the HELK Spark cluster to execute any tasks scheduled by the SparkSession.
spark = SparkSession.builder \
.appName("HELK Reader") \
.master("spark://helk-spark-master:7077") \
.enableHiveSupport() \
.getOrCreate()

Verify Spark Variable

  • Once the SparkSession is built, we can run the spark variable for verification.

Initiate an Elasticsearch Reader

  • In order to start reading from Elasticsearch, we need to use the DataFrameReader class and the read method to read data in as a DataFrame.
  • Our basic commands only initiate the DataFrame reader
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("inferSchema", "true")
.option("es.read.field.as.array.include", "tags")
.option("es.nodes","helk-elasticsearch:9200")
.option("es.net.http.auth.user","elastic")
)

Load data from Elasticsearch: Sysmon Index

  • We can then use the load method to load data via the DataFrame reader and return a DataFrame.
  • We can specify a specific Index. In this case I selected the Sysmon index.
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")

Filter Sysmon DataFrame

  • We can use the Filter method to get to only ProcessCreate events and Select method to return specific columns from the Sysmon DataFrame.
processcreate_df = sysmon_df.filter(sysmon_df.action == "processcreate")

Show Sysmon ProcessCreate DataFrame

I hope you are enjoying this series so far. Until know, we have been building the foundations about using Jupyter Notebooks and leveraging structured data processing capabilities via DataFrames to represent and analyze data. There is a lot more that can be done with Apache Spark APIs. If you can dream it, you can code it and do it! 😉

In the next post, I will show you how to leverage relational data processing provided by Apache SparkSQL to join a few interesting security events that might not be as interesting or suspicious when analyzed in isolation.

References

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession

https://spark.apache.org/docs/latest/index.html

Chambers, Bill; Zaharia, Matei. Spark: The Definitive Guide: Big Data Processing Made Simple. Kindle Edition.

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

https://www.elastic.co/products/hadoop

https://databricks.com/glossary/what-is-spark-sql

--

--