Use Azure Databricks to analyze access logs and create reports. I had a small project called, so I will share the procedure. The overall overview is like this. Collect logs from Defender ATP and Office 365 Admin Center and store them in Azure Blob Storage. We use Azure Databricks to analyze, process, and integrate for viewing in Power BI reports.
Follow the 4 steps below. In ** Step 1 & 2 **, I did basically the same thing with Pandas DataFrame on my local PC and DataFrame on Azure Databricks, respectively, to see the difference between Pandas and Spark Dataframe. Start by reading each CSV / JSON. Try Pandas for easy aggregation of data of a handy size, and Databricks for processing large volumes of data. ** Step 3 ** writes the Dataframe data to various formats for connecting with Power BI or for keeping at hand. Finally, in ** Step 4 ** until you view it as a report from Power BI. Make sure.
-** Step1: ** Reading and processing CSV files using Python Pandas / Azure Databricks -** Step2: ** Processing JSON files using Azure Databricks -** Step3: ** Write the above processing data to CSV / Parquet / Table -** Step4: ** Connect to Databricks and create a report that you can view from Power BI
First, imagine a unit test, import a CSV file in the local environment, and process and visualize it using Python Pandas. The following entries may be helpful for the Python development environment.
Install the Python environment with Anaconda
Try importing with Pandas.
import pandas as pd
df = pd.read_csv('AuditLog_2020-01-11_2020-04-11.csv')
print(df)
Check the column name and type type to see what information you have.
print(df.columns)
df.dtypes
Index(['CreationDate', 'UserIds', 'Operations', 'AuditData'], dtype='object')
CreationDate object
UserIds object
Operations object
AuditData object
dtype: object
Try to display the first 5 lines.
df.head(5)
CreationDate UserIds Operations AuditData
0 2020-04-10T21:24:57.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T21:24:57","Id":"ba..." 1 2020-04-10T20:55:58.0000000Z [email protected] FileUploaded {"CreationTime":"2020-04-10T20:55:58","Id":"80..." 2 2020-04-10T20:32:49.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T20:32:49","Id":"51..." 3 2020-04-10T20:33:39.0000000Z [email protected] FileAccessed {"CreationTime":"2020-04-10T20:33:39","Id":"c0..." 4 2020-04-10T19:32:36.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T19:32:36","Id":"28..." ```
Since the data in the AuditData column is not used this time, the entire column will be deleted. You can reflect the changes in the DataFrame by adding the "inplace = True" option.
df.drop("AuditData", axis=1, inplace=True)
The date / time data is written in the CreationDate column, but it cannot be used as it is, so convert it to the date / time data type.
df['CreationDate'] = pd.to_datetime(df['CreationDate'])
Before use: 2020-04-10T21:24:57.0000000Z After use: 2020-04-10 21:24:57 ```
Check the data type. It has been converted to "datetime64".
df.dtypes
CreationDate datetime64[ns]
UserIds object
Operations object
dtype: object
We'll have columns with data that we might need when creating Power BI reports. You can create a measure on the Power BI side, but I thought it would improve the performance when viewing the report, so I put it in a column.
df['Hour'] = df['CreationDate'].dt.hour
df['Weekday_Name'] = df['CreationDate'].dt.weekday_name
df['DayofWeek'] = df['CreationDate'].dt.dayofweek
Finally, let's check the column name and type type.
print(df.columns)
df.dtypes
Index(['CreationDate', 'UserIds', 'Operations', 'Hour', 'Weekday_Name', 'DayofWeek'],
dtype='object')
CreationDate datetime64[ns]
UserIds object
Operations object
Hour int64
Weekday_Name object
DayofWeek int64
dtype: object
```
After confirming, write the result to a CSV file.
df.to_csv('AuditLog_2020-01-11_2020-04-11_edited.csv')
If you have a limited number of log files to analyze, you can use Pandas, but what if you want to analyze a large amount of log data that does not fit in memory at once? Try to do the same with Azure Databricks DataFrame.
Create an Azure Data Lake Storage Gen2 account and upload the CSV file. Reference: See Create an Azure Data Lake Storage Gen2 account please.
Load the CSV file into Azure Databricks. The Qiita entry of the team member was helpful. Reference: "Mount Data Lake Storage Gen2 from Azure Databricks"
For data handling in Databricks, this Qiita entry was helpful. Reference: "Memo of the one often used when handling data with pyspark"
Mount the file system.
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<Service Principal Application ID>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<AAD file system name tenant ID>/oauth2/token",
"fs.azure.createRemoteFileSystemDuringInitialization": "true"}
dbutils.fs.mount(
source = "abfss://auditlog@<Storage account name>.dfs.core.windows.net/",
mount_point = "/mnt/auditdata",
extra_configs = configs)
If it is already mounted and an error occurs, unmount it once.
``` python:Optional
dbutils.fs.unmount("/mnt/auditdata") ```
Read the CSV file. By specifying "inferschema ='true'" here, the type type is inferred and the data is stored in the Dataframe.
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11.csv")
Check the column name and type type to see what information you have. Spark Dataframe recognizes CreationDate as a timestamp type.
Spark_df.printSchema()
root
|-- CreationDate: timestamp (nullable = true)
|-- UserIds: string (nullable = true)
|-- Operations: string (nullable = true)
|-- AuditData: string (nullable = true)
Try to display the first 5 lines. Specifying False for the show method removes the Truncate option and displays the entire contents of the column data.
Spark_df.show(5, False)
+-------------------+---------------------+------------+------------------------------------------+
|CreationDate |UserIds |Operations |AuditData |
+-------------------+---------------------+------------+------------------------------------------+
|2020-04-10 21:24:57|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T21:24:57"|
|2020-04-10 20:55:58|[email protected]|FileUploaded|"{""CreationTime"":""2020-04-10T20:55:58"|
|2020-04-10 20:32:49|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T20:32:49"|
|2020-04-10 20:33:39|[email protected]|FileAccessed|"{""CreationTime"":""2020-04-10T20:33:39"|
|2020-04-10 19:32:36|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T19:32:36"|
+-------------------+---------------------+------------+------------------------------------------+
only showing top 5 rows
As before, we'll exclude the AuditData column and have the columns with data that we might need when creating a Power BI report.
from pyspark.sql.functions import concat, date_format, col, lit
Spark_df = Spark_df.select('CreationDate', 'UserIds', 'Operations', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
Spark_df = Spark_df.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
Spark_df.show()
+-------------------+--------------------+-------------------+----+---------+------------+--------+
| CreationDate| UserIds| Operations|Hour|DayofWeek|Weekday_Name|Day_Weekday|
+-------------------+--------------------+-------------------+----+---------+------------+--------+
|2020-04-10 21:24:57|abc@contoso...| UserLoggedIn| 21| 5| Fri| 5_Fri|
|2020-04-10 20:55:58|abc@contoso...| FileUploaded| 20| 5| Fri| 5_Fri|
|2020-04-10 20:32:49|abc@contoso...| UserLoggedIn| 20| 5| Fri| 5_Fri|
|2020-04-10 20:33:39|abc@contoso...| FileAccessed| 20| 5| Fri| 5_Fri|
|2020-04-10 19:32:36|abc@contoso...| UserLoggedIn| 19| 5| Fri| 5_Fri|
There is a solution called Microsoft Defender Advanced Threat Protection (DATP) that can avoid, detect, investigate, and respond to various threats that threaten the enterprise environment, but with the function called Advanced Hunting, Microsoft Defender Security Center can be used. You can search the stored data for up to 30 days under various conditions and use it for analysis.
This time, let's collect Security Center information from Databricks using REST API and process it in the same way as Step 1.
To call the Advanced Hunting API from Python, first get an access token.
import json
import urllib.request
import urllib.parse
tenantId = '00000000-0000-0000-0000-000000000000' # Paste your own tenant ID here
appId = '11111111-1111-1111-1111-111111111111' # Paste your own app ID here
appSecret = '22222222-2222-2222-2222-222222222222' # Paste your own app secret here
url = "https://login.windows.net/%s/oauth2/token" % (tenantId)
resourceAppIdUri = 'https://api.securitycenter.windows.com'
body = {
'resource' : resourceAppIdUri,
'client_id' : appId,
'client_secret' : appSecret,
'grant_type' : 'client_credentials'
}
data = urllib.parse.urlencode(body).encode("utf-8")
req = urllib.request.Request(url, data)
response = urllib.request.urlopen(req)
jsonResponse = json.loads(response.read())
aadToken = jsonResponse["access_token"]
Run a Kusto query to get the information. This time we will collect logs when a particular process initiates an event involving a network connection. You can track user processes and analyze activity.
query = 'DeviceNetworkEvents' # Paste your own query here
url = "https://api.securitycenter.windows.com/api/advancedqueries/run"
headers = {
'Content-Type' : 'application/json',
'Accept' : 'application/json',
'Authorization' : "Bearer " + aadToken
}
data = json.dumps({ 'Query' : query }).encode("utf-8")
req = urllib.request.Request(url, data, headers)
response = urllib.request.urlopen(req)
jsonResponse = json.loads(response.read())
schema = jsonResponse["Schema"]
results = jsonResponse["Results"]
Store the information obtained from the Advanced Hunting API in Spark Dataframe.
rddData = sc.parallelize(results)
Spark_df2 = spark.read.json(rddData)
Check the column name and type type to see what information you have. The date and time information is stored in Timestamp, but this time it was not recognized as a timestamp type.
Spark_df2.printSchema()
root
|-- ActionType: string (nullable = true)
|-- AppGuardContainerId: string (nullable = true)
|-- DeviceId: string (nullable = true)
|-- DeviceName: string (nullable = true)
|-- InitiatingProcessAccountDomain: string (nullable = true)
|-- InitiatingProcessAccountName: string (nullable = true)
|-- InitiatingProcessAccountObjectId: string (nullable = true)
|-- InitiatingProcessAccountSid: string (nullable = true)
|-- InitiatingProcessAccountUpn: string (nullable = true)
|-- InitiatingProcessCommandLine: string (nullable = true)
|-- InitiatingProcessCreationTime: string (nullable = true)
|-- InitiatingProcessFileName: string (nullable = true)
|-- InitiatingProcessFolderPath: string (nullable = true)
|-- InitiatingProcessId: long (nullable = true)
|-- InitiatingProcessIntegrityLevel: string (nullable = true)
|-- InitiatingProcessMD5: string (nullable = true)
|-- InitiatingProcessParentCreationTime: string (nullable = true)
|-- InitiatingProcessParentFileName: string (nullable = true)
|-- InitiatingProcessParentId: long (nullable = true)
|-- InitiatingProcessSHA1: string (nullable = true)
|-- InitiatingProcessSHA256: string (nullable = true)
|-- InitiatingProcessTokenElevation: string (nullable = true)
|-- LocalIP: string (nullable = true)
|-- LocalIPType: string (nullable = true)
|-- LocalPort: long (nullable = true)
|-- Protocol: string (nullable = true)
|-- RemoteIP: string (nullable = true)
|-- RemoteIPType: string (nullable = true)
|-- RemotePort: long (nullable = true)
|-- RemoteUrl: string (nullable = true)
|-- ReportId: long (nullable = true)
|-- Timestamp: string (nullable = true)
|-- _corrupt_record: string (nullable = true)
Use "InitiatingProcessFileName" to check the statistics for each process.
Spark_df2.groupBy("InitiatingProcessFileName").count().sort("count", ascending=False).show()
+-------------------------+-----+
|InitiatingProcessFileName|count|
+-------------------------+-----+
| svchost.exe|10285|
| MsSense.exe| 2179|
| chrome.exe| 1693|
| OfficeClickToRun.exe| 1118|
| OneDrive.exe| 914|
| AvastSvc.exe| 764|
| backgroundTaskHos...| 525|
| MicrosoftEdgeCP.exe| 351|
Convert the data type of the "Timestamp" column to the Timestamp type and save it together with Step 1 with the column name "CreationDate".
from pyspark.sql.types import TimestampType
Spark_df2 = Spark_df2.withColumn("CreationDate", Spark_df2["Timestamp"].cast(TimestampType()))
Spark_df2.printSchema()
As before, we'll exclude unnecessary columns and have columns with data that might be needed when creating a Power BI report.
from pyspark.sql.functions import concat, date_format, col, lit
Spark_df2 = Spark_df2.select('CreationDate', 'DeviceId', 'DeviceName', 'InitiatingProcessFileName', 'InitiatingProcessAccountName', 'RemoteUrl', 'RemoteIP', 'LocalIP', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
Spark_df2 = Spark_df2.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
Spark_df2.show()
Check the column name and type type. It was refreshing.
Spark_df2.printSchema()
root
|-- CreationDate: timestamp (nullable = true)
|-- DeviceId: string (nullable = true)
|-- DeviceName: string (nullable = true)
|-- InitiatingProcessFileName: string (nullable = true)
|-- InitiatingProcessAccountName: string (nullable = true)
|-- RemoteUrl: string (nullable = true)
|-- RemoteIP: string (nullable = true)
|-- LocalIP: string (nullable = true)
|-- Hour: string (nullable = true)
|-- DayofWeek: string (nullable = true)
|-- Weekday_Name: string (nullable = true)
|-- Day_Weekday: string (nullable = true)
Now that it looks good, let's write the data processed in Step 1 and Step 2 in various formats.
Since the data of Step 1 is created in Spark_df and the data of Step 2 is created in Spark_df2, let's write it to the CSV file. You can combine the output files with coalesce (1). If you need Header information, set it to "true" as an option.
Spark_df.coalesce(1).write.option("header", "true").csv("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11_edited.csv")
Make sure the CSV file is created in the Azure Data Lake Storage Gen2 storage account mounted on Databricks. When I download it, it seems that the CSV file is stored directly under the folder with the specified file name.
(Reference) CSV reading is as follows
``` python:Input
#Spark Dataframe
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/Spark_df.csv")
display (Spark_df)
#pandas
import pandas as pd
pd_dataframe = pd.read_csv('/dbfs/mnt/auditdata/Spark_df.csv')
```
Try writing in Parquet format as well.
Spark_df.write.mode("append").parquet("/mnt/auditdata/parquet/audit")
(Reference) The reading of Parquet is as follows
``` python:Input
#Python
data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)
#Scala
%scala
val data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)
#SQL
%sql
CREATE TEMPORARY TABLE scalaTable
USING parquet
OPTIONS (
path "/mnt/auditdata/parquet/audit"
)
SELECT * FROM scalaTable
```
Try writing in Databricks Table format as well.
Spark_df.write.saveAsTable("worktime")
worktime = spark.sql("select * from worktime")
display(worktime.select("*"))
Finally, let's create a report that can be viewed in Power BI using the data so far.
Start Databricks Workspace from the Azure portal, display "Cluster" from the left panel, and select the cluster running the Table to connect to.
In the cluster settings panel, select Advanced Options to display the JDBC / ODBC menu.
The settings screen contains the following information.
Use this information to get the connection destination setting string.
https://<Hostname>:<Port>/<HTTP Path>
Specifically, it should be a string like the one below.
Server : https://xxx-xxx.1.azuredatabricks.net:443/sql/protocolv1/o/687887143797341/xxxx-xxxxxx-xxxxxxxx
On the Databrick workspace management screen, click the user profile icon in the upper right and click User Settings.
Click the "Access Tokens" tab and click the "Generate New Token" button.
On the "Generate New Token" screen, write "Power BI" in the "Comment" field. It's an option so you don't have to write it.
Click the "Generate" button and copy and save the created token.
Launch Power BI Desktop and select "Spark" as the destination data source from "Get Data".
In the Spark connection settings, paste the connection destination setting character string you obtained earlier into the "Server" field. Select "HTTP" as the protocol and "Direct Query" as the connection mode, and click the "OK" button.
In the Spark connection settings, enter "token" in the "User name" field and paste the Password you obtained earlier. Click the "Connect" button.
The list of tables created in Step 3 will be displayed. Select the table required for the Power BI report and click the "Load" button.
Using the data prepared in Steps 1 to 3, I finally made a report like this in Power BI Desktop.
This time, I tried to proceed with log analysis and visualization using Databricks. It feels like we're only leveraging some of Databricks' potential. In reality, it must be able to demonstrate its true potential in situations where distributed processing is required for a Data Lake that has accumulated a large amount of data.
Still, it is a versatile processing platform that can be used in any language such as Scala, Python, R, SQL, stream processing, machine learning, visualization, and it is also wonderful that it can be linked with various Azure services including Power BI. I felt that.
We recommend Azure Databricks with confidence to anyone who has data but is wondering how to use it or has problems with data processing.
I was also interested in linking with Azure SQL database and Cosmos DB, so I will try it next time.
Recommended Posts