¿Alguna vez te has encontrado con el frustrante problema de errores en la tubería de datos causados por cambios en el esquema de la fuente de datos? Es un problema común que puede interrumpir tu flujo de trabajo y llevar a tiempos de inactividad innecesarios. Pero, ¿qué pasaría si hubiera una forma de manejar los cambios de esquema que no rompen la compatibilidad automáticamente? En esta publicación, exploraremos cómo Delta Lake y Databricks pueden ayudarnos a lograr esto.
Primero, aclaremos lo que queremos decir con cambios que no rompen la compatibilidad. Estos son cambios en los que un cambio de esquema no afectará la integridad del objeto sink. Por ejemplo, agregar una nueva columna a una tabla es un cambio que no rompe la compatibilidad. Por otro lado, cambiar el tipo de datos de una columna de cadena a entero se consideraría un cambio que rompe la compatibilidad, donde se requiere la aplicación del esquema.
Con Delta Lake, podemos tener lo mejor de ambos mundos. Personalmente, prefiero que mi objeto sink contenga el conjunto de columnas más grande en comparación con el objeto fuente. Esto significa que el sink conservará todas las columnas que hayan existido en la fuente, incluso si ya no están presentes. Este enfoque nos permite preservar los datos históricos, a menos que haya requisitos específicos como las regulaciones de GDPR.
Echemos un vistazo a algunos ejemplos de código para entender cómo funciona esto. Suponiendo que ya tienes tus objetos fuente y sink leídos en data frames llamados dfSource y dfSink respectivamente, podemos proceder a comparar sus esquemas.
dfSourceSchema = dfSource.schema
dfSinkSchema = dfSink.schema
schemaMatch = True
for field in dfSourceSchema.fields:
if field not in dfSinkSchema.fields:
schemaMatch = False
break
if not schemaMatch:
dfMergedSchema = dfSink.where("1=0")
# Realiza la fusión utilizando la sintaxis estándar MERGE INTO
# con la notación INSERT/UPDATE *
En el código anterior, comparamos cada campo en los esquemas de la fuente y el sink para determinar si hay algún campo faltante. Si se encuentra una discrepancia, establecemos la variable schemaMatch en False. Luego creamos un dataframe vacío, dfMergedSchema, para contener el esquema fusionado. Esto nos permite usar la sintaxis estándar MERGE INTO para fusionar datos utilizando la notación INSERT/UPDATE *.
Al seguir este enfoque, podemos evolucionar automáticamente el esquema del objeto sink para cambios que no rompen la compatibilidad en nuestro objeto fuente. Esto nos ahorra la molestia de actualizar manualmente el esquema del sink cada vez que hay un cambio en la fuente.
Si estás interesado en explorar el código más a fondo, puedes encontrar el código fuente en el Repositorio de Github.
¡Gracias por leer y feliz gestión de tuberías de datos!