Published on

September 12, 2023

Construction de pipelines ELT efficaces et fiables en temps réel dans Azure avec Delta Live Tables

De nombreux clients d’Azure ont besoin de pipelines ELT en temps réel performants, évolutifs, maintenables, fiables et testables pour leurs data lakehouses. Ces pipelines doivent prendre en charge des scripts ELT personnalisés, l’orchestration des tâches, la surveillance, et bien plus encore. Heureusement, Azure propose des outils robustes tels que Data Factory et Databricks qui offrent un support pour bon nombre de ces fonctionnalités ELT. En particulier, Delta Live Tables de Databricks permet aux ingénieurs de données de définir des pipelines de données en temps réel à l’aide de tâches Apache Spark.

Delta Live Tables dans Azure Databricks permet aux développeurs de planifier et de surveiller des tâches, de gérer des clusters, de gérer les erreurs et d’appliquer des normes de qualité des données sur des données en temps réel avec facilité. Il offre une visibilité sur les pipelines opérationnels avec des fonctionnalités intégrées de gouvernance, de versioning et de documentation pour suivre visuellement les statistiques et la généalogie des données. De plus, Delta Live Tables prend en charge l’ingestion de données en continu via Auto Loader.

Dans cet article, nous allons explorer comment commencer avec Delta Live Tables pour créer des définitions de pipelines dans vos notebooks Databricks. Nous apprendrons comment ingérer des données dans le data lakehouse et construire de manière déclarative des pipelines en temps réel pour transformer les données brutes et agréger les données métier pour obtenir des informations et des analyses. Nous aborderons également la mise en œuvre d’attentes et de vérifications de qualité des données déclaratives dans votre pipeline, l’ajout de commentaires pour la documentation dans les pipelines, la curation des données brutes et leur préparation pour une analyse ultérieure en utilisant soit la syntaxe SQL, soit la syntaxe PySpark. Enfin, nous plongerons dans la création, la configuration et l’exécution de pipelines et de tâches Delta Live Tables.

Créer un notebook

La première étape pour créer un pipeline Delta Live Table (DLT) consiste à créer un nouveau notebook Databricks attaché à un cluster. Delta Live Tables prend en charge les langages de notebook Python et SQL. Voici un exemple de notebook DLT contenant trois sections de scripts pour les trois étapes du processus ELT de ce pipeline :

CREATE LIVE TABLE nyctaxi_raw
COMMENT "Il s'agit de l'ensemble de données brutes nyctaxi au format Delta."
SELECT * FROM delta.`/mnt/raw/delta/Factnyctaxi`

CREATE LIVE TABLE Factnyctaxi_staging(
  CONSTRAINT valid_VendorID EXPECT (VendorID IS NOT NULL),
  CONSTRAINT valid_passenger_count EXPECT (passenger_count > 0) ON VIOLATION DROP ROW
)
COMMENT "Données nyctaxi nettoyées et préparées pour l'analyse."
AS SELECT
  VendorID AS ID,
  CAST(passenger_count AS INT) AS Count,
  total_amount AS Amount,
  trip_distance AS Distance,
  tpep_pickup_datetime AS PickUp_Datetime,
  tpep_dropoff_datetime AS DropOff_Datetime
FROM live.nyctaxi_raw

CREATE LIVE TABLE Factnyctaxi
COMMENT "La table Factnyc curée contenant des comptages agrégés, des montants et des données de distance."
AS SELECT
  VendorID AS ID,
  tpep_pickup_datetime AS PickUp_Datetime,
  tpep_dropoff_datetime AS DropOff_Datetime,
  CAST(passenger_count AS INT) AS Count,
  total_amount AS Amount,
  trip_distance AS Distance
FROM live.Factnyctaxi_staging
WHERE tpep_pickup_datetime BETWEEN '2019-03-01 00:00:00' AND '2020-03-01 00:00:00'
AND passenger_count IS NOT NULL
GROUP BY VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, CAST(passenger_count AS INT), total_amount, trip_distance
ORDER BY VendorID ASC

Ce code SQL peut également être écrit en Python si nécessaire. Pour utiliser les fonctions et types SQL de PySpark, vous devrez importer les bibliothèques nécessaires :

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

Tout comme la fonction EXPECT SQL dans le script de notebook SQL DLT ci-dessus, avec PySpark, vous pouvez gérer les violations de lignes en fonction des attentes à l’aide de commandes telles que :

  • expect : Si une ligne viole l’attente, inclure la ligne dans l’ensemble de données cible.
  • expect_or_drop : Si une ligne viole l’attente, supprimer la ligne de l’ensemble de données cible.
  • expect_or_fail : Si une ligne viole l’attente, arrêter immédiatement l’exécution.
  • expect_all : Si une ligne viole l’une des attentes, inclure la ligne dans l’ensemble de données cible.
  • expect_all_or_drop : Si une ligne viole l’une des attentes, supprimer la ligne de l’ensemble de données cible.
  • expect_all_or_fail : Si une ligne viole l’une des attentes, arrêter immédiatement l’exécution.

Créer et exécuter un pipeline

Un pipeline dans Delta Live Tables est un graphe acyclique dirigé (DAG) qui relie les sources de données aux ensembles de données cibles. Une fois que vous avez créé le contenu des ensembles de données DLT à l’aide de requêtes SQL, vous pouvez créer un pipeline. Les propriétés de configuration du pipeline comprennent le nom du pipeline, l’emplacement où le code du notebook DLT est stocké, l’emplacement de stockage, le mode du pipeline et les spécifications du cluster. Vous pouvez également ajouter des bibliothèques de notebooks si vos scripts sont répartis sur plusieurs notebooks.

Voici un exemple de script JSON d’un pipeline :

{
  "name": "Pipeline de données DLT NYCTaxi",
  "storage": "/mnt/data/raw/Factnyctaxi",
  "clusters": [
    {
      "num_workers": 1,
      "spark_conf": {}
    }
  ],
  "libraries": [
    {
      "notebook": {
         "path": "/Users/ronlesteve/dlt/Factnyctaxi"
      }
    }
  ],
  "continuous": false
}

Après avoir créé le pipeline, vous pouvez le configurer, le démarrer et le surveiller dans vos environnements de développement et de production. L’interface utilisateur des détails du pipeline affiche l’état de démarrage, d’exécution et de terminaison des étapes du pipeline. Elle suit également les dépendances entre les tâches pour afficher clairement la généalogie.

Une fois que le pipeline a terminé son exécution, il affiche les métriques liées aux métadonnées qui ont été suivies pour la tâche. Ces métriques comprennent le nombre de lignes insérées dans la table et les métriques liées aux attentes. Par exemple, si des enregistrements ont échoué ou été supprimés, cela sera suivi ici.

Planifier un pipeline

Une fois que votre pipeline a été créé et testé avec succès, vous pouvez créer une tâche qui spécifie le pipeline en tant que tâche au sein de la tâche. Vous pouvez personnaliser la planification et configurer les tentatives de répétition et les exécutions simultanées selon vos besoins. De plus, vous avez la possibilité de personnaliser et d’envoyer des alertes relatives à l’état de la tâche à une adresse e-mail spécifiée.

Explorer les journaux d’événements

Des journaux d’événements sont créés et conservés pour tous les pipelines Delta Live Table. Ces journaux contiennent des données relatives aux journaux d’audit, aux vérifications de qualité des données, à l’avancement du pipeline et à la généalogie des données pour le suivi et la surveillance de vos pipelines. Vous pouvez créer des vues pour interroger et analyser ces métriques d’événements.

Voici un exemple de création d’une vue pour les métriques d’événements système :

ADLSg2Path = "/mnt/raw/data/NycTaxidata"
df = spark.read.format("delta").load(f"{ADLSg2Path}/system/events")
df.createOrReplaceTempView("dlteventmetrics")

Une fois que la vue est créée, vous pouvez écrire des scripts PySpark ou SQL pour afficher les métriques liées aux journaux d’audit. Par exemple :

latest_update_id = spark.sql("SELECT origin.update_id FROM dlteventmetrics WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1").collect()[0].update_id
spark.conf.set('latest_update.id', latest_update_id)

Ces journaux d’événements fournissent des informations précieuses pour le suivi des performances, de l’état, de la qualité, de la latence, etc. Vous pouvez créer des requêtes personnalisées pour extraire des métriques spécifiques et créer des tableaux de bord visuellement attrayants dans Databricks ou Power BI pour les rapports.

Résumé

Delta Live Tables dans Azure Databricks offre une solution puissante pour la construction de pipelines ELT efficaces et fiables en temps réel pour les data lakehouses. Avec Delta Expectations, vous pouvez garantir une haute qualité et une cohérence des données au sein de la Lakehouse. Les tâches planifiées pour le traitement des pipelines DLT permettent la récupération et la gestion des erreurs, ainsi qu’une alerte robuste de l’état de la tâche. L’interface utilisateur prête à l’emploi permet de surveiller facilement les étapes du pipeline, et des tableaux de bord et des métriques supplémentaires peuvent être créés pour personnaliser davantage les visualisations et les rapports des métriques d’événements.

À mesure que vous continuez à développer vos pipelines DLT, vous pouvez explorer des fonctionnalités supplémentaires telles que la paramétrisation des pipelines pour un framework dynamique et l’optimisation des performances et des coûts des requêtes en supprimant les anciennes versions des tables. Delta Live Tables simplifie et ajoute une planification et un suivi robustes pour les tâches et les pipelines ELT, vous permettant d’obtenir plus rapidement des informations précieuses à partir de vos données.

Article mis à jour pour la dernière fois le 2022-04-06

Click to rate this post!
[Total: 0 Average: 0]

Let's work together

Send us a message or book free introductory meeting with us using button below.