Utilisez Azure Databricks pour analyser les journaux d'accès et créer des rapports. J'avais un petit projet appelé, donc je vais partager la procédure. L'aperçu général est comme ça. Collectez les journaux de Defender ATP et du centre d'administration Office 365 et stockez-les dans Azure Blob Storage. Nous utilisons Azure Databricks pour analyser, traiter et intégrer pour l'affichage dans les rapports Power BI.
Suivez les 4 étapes ci-dessous. Dans ** Étape 1 et 2 , j'ai essayé fondamentalement la même chose avec Pandas DataFrame sur le PC local et DataFrame sur Azure Databricks pour voir la différence entre Pandas et Spark Dataframe. Commencez par lire chaque CSV / JSON. Essayez Pandas pour une agrégation facile de données de taille pratique et Databricks pour traiter de gros volumes de données. ** L'étape 3 ** écrit les données Dataframe dans différents formats pour se connecter à Power BI ou pour les garder à portée de main. Enfin, à l ' Étape 4 **, de Power BI à l'affichage sous forme de rapport. Assure-toi.
Tout d'abord, imaginez un test unitaire, importez un fichier CSV dans l'environnement local, puis traitez-le et visualisez-le à l'aide de Python Pandas. Les entrées suivantes peuvent être utiles pour l'environnement de développement Python.
Installer l'environnement Python avec Anaconda
Essayez de capturer avec des Pandas.
import pandas as pd
df = pd.read_csv('AuditLog_2020-01-11_2020-04-11.csv')
print(df)
Vérifiez le nom de la colonne et le type de type pour voir les informations dont vous disposez.
print(df.columns)
df.dtypes
Index(['CreationDate', 'UserIds', 'Operations', 'AuditData'], dtype='object')
CreationDate object
UserIds object
Operations object
AuditData object
dtype: object
Essayez d'afficher les 5 premières lignes.
df.head(5)
CreationDate UserIds Operations AuditData
0 2020-04-10T21:24:57.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T21:24:57","Id":"ba..." 1 2020-04-10T20:55:58.0000000Z [email protected] FileUploaded {"CreationTime":"2020-04-10T20:55:58","Id":"80..." 2 2020-04-10T20:32:49.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T20:32:49","Id":"51..." 3 2020-04-10T20:33:39.0000000Z [email protected] FileAccessed {"CreationTime":"2020-04-10T20:33:39","Id":"c0..." 4 2020-04-10T19:32:36.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T19:32:36","Id":"28..." ```
Puisque les données de la colonne AuditData ne sont pas utilisées cette fois, la colonne entière sera supprimée. Vous pouvez refléter les modifications dans le DataFrame en ajoutant l'option "inplace = True".
df.drop("AuditData", axis=1, inplace=True)
Les données de date / heure sont écrites dans la colonne CreationDate, mais elles ne peuvent pas être utilisées telles quelles, donc convertissez-les en type de données date / heure.
df['CreationDate'] = pd.to_datetime(df['CreationDate'])
Avant utilisation: 2020-04-10T21:24:57.0000000Z Après utilisation: 2020-04-10 21:24:57 ```
Vérifiez le type de données. Il a été converti en "datetime64".
df.dtypes
CreationDate datetime64[ns]
UserIds object
Operations object
dtype: object
Nous aurons des colonnes avec des données dont nous pouvons avoir besoin lors de la création de rapports Power BI. Il est possible de créer une mesure côté Power BI, mais j'ai pensé que cela améliorerait les performances lors de la visualisation du rapport, je l'ai donc mise dans une colonne.
df['Hour'] = df['CreationDate'].dt.hour
df['Weekday_Name'] = df['CreationDate'].dt.weekday_name
df['DayofWeek'] = df['CreationDate'].dt.dayofweek
Enfin, vérifions le nom de la colonne et le type de type.
print(df.columns)
df.dtypes
Index(['CreationDate', 'UserIds', 'Operations', 'Hour', 'Weekday_Name', 'DayofWeek'],
dtype='object')
CreationDate datetime64[ns]
UserIds object
Operations object
Hour int64
Weekday_Name object
DayofWeek int64
dtype: object
```
Après confirmation, écrivez le résultat dans un fichier CSV.
df.to_csv('AuditLog_2020-01-11_2020-04-11_edited.csv')
Si vous avez un nombre limité de fichiers journaux à analyser, Pandas est très bien, mais que se passe-t-il si vous souhaitez analyser une grande quantité de données de journal qui ne tiennent pas en mémoire en même temps? Essayez de faire de même avec Azure Databricks DataFrame.
Créez un compte Azure Data Lake Storage Gen2 et téléchargez le fichier CSV. Référence: voir Créer un compte Azure Data Lake Storage Gen2. S'il vous plaît.
Chargez le fichier CSV dans Azure Databricks. L'entrée Qiita du membre de l'équipe a été utile. Référence: "Montage de Data Lake Storage Gen2 à partir d'Azure Databricks"
Pour la gestion des données dans Databricks, cette entrée Qiita a été utile. Référence: "Mémo de celui souvent utilisé lors du traitement des données avec pyspark"
Montez le système de fichiers.
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<ID de l'application principale du service>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<ID de locataire du nom du système de fichiers AAD>/oauth2/token",
"fs.azure.createRemoteFileSystemDuringInitialization": "true"}
dbutils.fs.mount(
source = "abfss://auditlog@<Nom du compte de stockage>.dfs.core.windows.net/",
mount_point = "/mnt/auditdata",
extra_configs = configs)
S'il est déjà monté et qu'une erreur se produit, démontez-le une fois.
``` python:Optional
dbutils.fs.unmount("/mnt/auditdata") ```
Lisez le fichier CSV. En spécifiant ici "inferschema = 'true'", le type de type est déduit et les données sont stockées dans le Dataframe.
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11.csv")
Vérifiez le nom de la colonne et le type de type pour voir les informations dont vous disposez. Spark Dataframe reconnaît CreationDate comme type d'horodatage.
Spark_df.printSchema()
root
|-- CreationDate: timestamp (nullable = true)
|-- UserIds: string (nullable = true)
|-- Operations: string (nullable = true)
|-- AuditData: string (nullable = true)
Essayez d'afficher les 5 premières lignes. Spécifier False pour la méthode show supprime l'option Tronquer et affiche l'intégralité du contenu des données de la colonne.
Spark_df.show(5, False)
+-------------------+---------------------+------------+------------------------------------------+
|CreationDate |UserIds |Operations |AuditData |
+-------------------+---------------------+------------+------------------------------------------+
|2020-04-10 21:24:57|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T21:24:57"|
|2020-04-10 20:55:58|[email protected]|FileUploaded|"{""CreationTime"":""2020-04-10T20:55:58"|
|2020-04-10 20:32:49|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T20:32:49"|
|2020-04-10 20:33:39|[email protected]|FileAccessed|"{""CreationTime"":""2020-04-10T20:33:39"|
|2020-04-10 19:32:36|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T19:32:36"|
+-------------------+---------------------+------------+------------------------------------------+
only showing top 5 rows
Comme précédemment, nous allons exclure la colonne AuditData et faire en sorte que la colonne contienne les données dont nous pourrions avoir besoin lors de la création d'un rapport Power BI.
from pyspark.sql.functions import concat, date_format, col, lit
Spark_df = Spark_df.select('CreationDate', 'UserIds', 'Operations', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
Spark_df = Spark_df.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
Spark_df.show()
+-------------------+--------------------+-------------------+----+---------+------------+--------+
| CreationDate| UserIds| Operations|Hour|DayofWeek|Weekday_Name|Day_Weekday|
+-------------------+--------------------+-------------------+----+---------+------------+--------+
|2020-04-10 21:24:57|abc@contoso...| UserLoggedIn| 21| 5| Fri| 5_Fri|
|2020-04-10 20:55:58|abc@contoso...| FileUploaded| 20| 5| Fri| 5_Fri|
|2020-04-10 20:32:49|abc@contoso...| UserLoggedIn| 20| 5| Fri| 5_Fri|
|2020-04-10 20:33:39|abc@contoso...| FileAccessed| 20| 5| Fri| 5_Fri|
|2020-04-10 19:32:36|abc@contoso...| UserLoggedIn| 19| 5| Fri| 5_Fri|
Il existe une solution appelée Microsoft Defender Advanced Threat Protection (DATP) qui peut éviter, détecter, enquêter et traiter diverses menaces qui menacent l'environnement de l'entreprise, mais avec la fonction appelée Advanced Hunting, le Microsoft Defender Security Center peut être utilisé. Vous pouvez rechercher les données stockées jusqu'à 30 jours dans diverses conditions et les utiliser à des fins d'analyse.
Cette fois, collectons les informations de Security Center auprès de Databricks à l'aide de l'API REST et traitons-les de la même manière qu'à l'étape 1.
Pour appeler l'API Advanced Hunting depuis Python, commencez par obtenir un jeton d'accès.
import json
import urllib.request
import urllib.parse
tenantId = '00000000-0000-0000-0000-000000000000' # Paste your own tenant ID here
appId = '11111111-1111-1111-1111-111111111111' # Paste your own app ID here
appSecret = '22222222-2222-2222-2222-222222222222' # Paste your own app secret here
url = "https://login.windows.net/%s/oauth2/token" % (tenantId)
resourceAppIdUri = 'https://api.securitycenter.windows.com'
body = {
'resource' : resourceAppIdUri,
'client_id' : appId,
'client_secret' : appSecret,
'grant_type' : 'client_credentials'
}
data = urllib.parse.urlencode(body).encode("utf-8")
req = urllib.request.Request(url, data)
response = urllib.request.urlopen(req)
jsonResponse = json.loads(response.read())
aadToken = jsonResponse["access_token"]
Exécutez une requête Kusto pour obtenir les informations. Cette fois, nous collecterons des journaux lorsqu'un processus particulier initie un événement impliquant une connexion réseau. Vous pouvez suivre les processus utilisateur et analyser l'activité.
query = 'DeviceNetworkEvents' # Paste your own query here
url = "https://api.securitycenter.windows.com/api/advancedqueries/run"
headers = {
'Content-Type' : 'application/json',
'Accept' : 'application/json',
'Authorization' : "Bearer " + aadToken
}
data = json.dumps({ 'Query' : query }).encode("utf-8")
req = urllib.request.Request(url, data, headers)
response = urllib.request.urlopen(req)
jsonResponse = json.loads(response.read())
schema = jsonResponse["Schema"]
results = jsonResponse["Results"]
Stockez les informations obtenues à partir de l'API Advanced Hunting dans Spark Dataframe.
rddData = sc.parallelize(results)
Spark_df2 = spark.read.json(rddData)
Vérifiez le nom de la colonne et le type de type pour voir les informations dont vous disposez. Les informations de date et d'heure sont stockées dans Timestamp, mais cette fois, elles n'ont pas été reconnues par le type d'horodatage.
Spark_df2.printSchema()
root
|-- ActionType: string (nullable = true)
|-- AppGuardContainerId: string (nullable = true)
|-- DeviceId: string (nullable = true)
|-- DeviceName: string (nullable = true)
|-- InitiatingProcessAccountDomain: string (nullable = true)
|-- InitiatingProcessAccountName: string (nullable = true)
|-- InitiatingProcessAccountObjectId: string (nullable = true)
|-- InitiatingProcessAccountSid: string (nullable = true)
|-- InitiatingProcessAccountUpn: string (nullable = true)
|-- InitiatingProcessCommandLine: string (nullable = true)
|-- InitiatingProcessCreationTime: string (nullable = true)
|-- InitiatingProcessFileName: string (nullable = true)
|-- InitiatingProcessFolderPath: string (nullable = true)
|-- InitiatingProcessId: long (nullable = true)
|-- InitiatingProcessIntegrityLevel: string (nullable = true)
|-- InitiatingProcessMD5: string (nullable = true)
|-- InitiatingProcessParentCreationTime: string (nullable = true)
|-- InitiatingProcessParentFileName: string (nullable = true)
|-- InitiatingProcessParentId: long (nullable = true)
|-- InitiatingProcessSHA1: string (nullable = true)
|-- InitiatingProcessSHA256: string (nullable = true)
|-- InitiatingProcessTokenElevation: string (nullable = true)
|-- LocalIP: string (nullable = true)
|-- LocalIPType: string (nullable = true)
|-- LocalPort: long (nullable = true)
|-- Protocol: string (nullable = true)
|-- RemoteIP: string (nullable = true)
|-- RemoteIPType: string (nullable = true)
|-- RemotePort: long (nullable = true)
|-- RemoteUrl: string (nullable = true)
|-- ReportId: long (nullable = true)
|-- Timestamp: string (nullable = true)
|-- _corrupt_record: string (nullable = true)
Utilisez "InitiatingProcessFileName" pour vérifier les statistiques de chaque processus.
Spark_df2.groupBy("InitiatingProcessFileName").count().sort("count", ascending=False).show()
+-------------------------+-----+
|InitiatingProcessFileName|count|
+-------------------------+-----+
| svchost.exe|10285|
| MsSense.exe| 2179|
| chrome.exe| 1693|
| OfficeClickToRun.exe| 1118|
| OneDrive.exe| 914|
| AvastSvc.exe| 764|
| backgroundTaskHos...| 525|
| MicrosoftEdgeCP.exe| 351|
Convertissez le type de données de la colonne "Horodatage" en type Horodatage et enregistrez-le avec l'étape 1 avec le nom de colonne "CreationDate".
from pyspark.sql.types import TimestampType
Spark_df2 = Spark_df2.withColumn("CreationDate", Spark_df2["Timestamp"].cast(TimestampType()))
Spark_df2.printSchema()
Comme précédemment, nous allons exclure les colonnes inutiles et avoir des colonnes avec des données qui peuvent être nécessaires lors de la création d'un rapport Power BI.
from pyspark.sql.functions import concat, date_format, col, lit
Spark_df2 = Spark_df2.select('CreationDate', 'DeviceId', 'DeviceName', 'InitiatingProcessFileName', 'InitiatingProcessAccountName', 'RemoteUrl', 'RemoteIP', 'LocalIP', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
Spark_df2 = Spark_df2.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
Spark_df2.show()
Vérifiez le nom de la colonne et le type de type. C'était rafraîchissant.
Spark_df2.printSchema()
root
|-- CreationDate: timestamp (nullable = true)
|-- DeviceId: string (nullable = true)
|-- DeviceName: string (nullable = true)
|-- InitiatingProcessFileName: string (nullable = true)
|-- InitiatingProcessAccountName: string (nullable = true)
|-- RemoteUrl: string (nullable = true)
|-- RemoteIP: string (nullable = true)
|-- LocalIP: string (nullable = true)
|-- Hour: string (nullable = true)
|-- DayofWeek: string (nullable = true)
|-- Weekday_Name: string (nullable = true)
|-- Day_Weekday: string (nullable = true)
Maintenant que cela semble bon, écrivons les données traitées aux étapes 1 et 2 dans différents formats.
Puisque les données de l'étape 1 sont créées dans Spark_df et que les données de l'étape 2 sont créées dans Spark_df2, écrivons-les dans le fichier CSV. Vous pouvez combiner les fichiers de sortie avec coalesce (1). Si vous avez besoin d'informations d'en-tête, définissez-le sur "true" comme option.
Spark_df.coalesce(1).write.option("header", "true").csv("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11_edited.csv")
Assurez-vous que le fichier CSV est créé dans le compte de stockage Azure Data Lake Storage Gen2 monté sur Databricks. Lorsque je le télécharge, il semble que le fichier CSV soit stocké directement sous le dossier avec le nom de fichier spécifié.
(Référence) La lecture CSV est la suivante
``` python:Input
#Spark Dataframe
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/Spark_df.csv")
display (Spark_df)
#pandas
import pandas as pd
pd_dataframe = pd.read_csv('/dbfs/mnt/auditdata/Spark_df.csv')
```
Essayez également d'écrire au format Parquet.
Spark_df.write.mode("append").parquet("/mnt/auditdata/parquet/audit")
(Référence) La lecture de Parquet est la suivante
``` python:Input
#Python
data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)
#Scala
%scala
val data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)
#SQL
%sql
CREATE TEMPORARY TABLE scalaTable
USING parquet
OPTIONS (
path "/mnt/auditdata/parquet/audit"
)
SELECT * FROM scalaTable
```
Essayez également d'écrire au format Databricks Table.
Spark_df.write.saveAsTable("worktime")
worktime = spark.sql("select * from worktime")
display(worktime.select("*"))
Enfin, créons un rapport en utilisant les données jusqu'à présent afin qu'il puisse être affiché dans Power BI.
Démarrez Databricks Workspace à partir du portail Azure, affichez «Cluster» dans le panneau de gauche et sélectionnez le cluster exécutant la table auquel se connecter.
Dans le panneau des paramètres du cluster, sélectionnez Options avancées pour afficher le menu JDBC / ODBC.
L'écran des paramètres contient les informations suivantes.
Utilisez ces informations pour obtenir la chaîne de caractères du paramètre de destination de la connexion.
https://<Hostname>:<Port>/<HTTP Path>
Plus précisément, il doit s'agir d'une chaîne de caractères comme celle ci-dessous.
Server : https://xxx-xxx.1.azuredatabricks.net:443/sql/protocolv1/o/687887143797341/xxxx-xxxxxx-xxxxxxxx
Sur l'écran de gestion de l'espace de travail Databrick, cliquez sur l'icône de profil utilisateur en haut à droite et cliquez sur Paramètres utilisateur.
Cliquez sur l'onglet "Jetons d'accès" et cliquez sur le bouton "Générer un nouveau jeton".
Sur l'écran "Générer un nouveau jeton", écrivez "Power BI" dans le champ "Commentaire". C'est une option pour que vous n'ayez pas à l'écrire.
Cliquez sur le bouton "Générer" et copiez et enregistrez le jeton créé.
Lancez Power BI Desktop et sélectionnez «Spark» comme source de données de destination dans «Obtenir les données».
Dans les paramètres de connexion Spark, collez la chaîne de caractères du paramètre de destination de connexion obtenue précédemment dans le champ "Serveur". Sélectionnez "HTTP" comme protocole et "Requête directe" comme mode de connexion, puis cliquez sur le bouton "OK".
Dans les paramètres de connexion Spark, saisissez "token" dans le champ "User name" et collez le mot de passe que vous avez obtenu précédemment. Cliquez sur le bouton "Connecter".
La liste des tables créées à l'étape 3 s'affiche. Sélectionnez la table requise pour le rapport Power BI et cliquez sur le bouton "Charger".
À l'aide des données préparées aux étapes 1 à 3, j'ai finalement créé un rapport avec Power BI Desktop comme celui-ci.
Cette fois, j'ai essayé de procéder à l'analyse et à la visualisation des journaux à l'aide de Databricks. On a l'impression que nous n'exploitons qu'une partie du potentiel de Databricks. En réalité, il doit pouvoir démontrer son vrai potentiel dans des situations où un traitement distribué est requis pour Data Lake, qui a accumulé une grande quantité de données.
Néanmoins, il s'agit d'une plate-forme de traitement polyvalente qui peut être utilisée dans n'importe quel langage tel que Scala, Python, R, SQL, et il est merveilleux qu'elle puisse effectuer le traitement de flux, l'apprentissage automatique, la visualisation et qu'elle puisse également être liée à divers services Azure, y compris Power BI. J'ai ressenti cela.
Nous recommandons Azure Databricks en toute confiance à quiconque possède des données mais se demande comment les utiliser ou rencontre des problèmes de traitement des données.
J'étais également intéressé par la liaison avec la base de données Azure SQL et Cosmos DB, je vais donc l'essayer la prochaine fois.