In this article, we will explore how to leverage Azure Data Factory (ADF) and a custom script to upload data in Azure MySQL database when we have CSV files with different schemas. This scenario often arises when dealing with files from multiple vendors with varying column names and orders.
By default, Azure Data Factory does not provide a straightforward way to upload data directly for this given scenario. However, we can use a custom activity in ADF to handle various tasks that cannot be accomplished directly.
The Problem
Let’s say we have files coming from over 100 vendors, each with different column formats and orders. These files need to be loaded into a default schema in a MySQL database on Azure. Whenever a new vendor is added, their file should be automatically accommodated by the solution.
The Challenges
While creating this solution using Azure Data Factory, we would have to create 100 source and destination sinks. For each new client, there would be a new pipeline. Additionally, when trying to make the columns dynamic in storage, the columns stored in the database should be read once. Currently, ADF only allows reading the first row or using a foreach loop to read all rows.
The Solution
To implement this solution, we will use the following services in Azure:
- Azure Batch Account
- Azure Data Factory – Custom Activity
- Azure MySQL database
- Azure Blob Storage
We will keep the CSV files in Azure Blob Storage and copy the storage key to a text file for configuration. We also need to create an Azure Batch account, which is free in Azure. The cost for these accounts is associated with a VM that runs the batch jobs. We can use low priority VMs, which are priced very low compared to normal VMs.
Once the batch account is created, we need to create a batch pool. In the pool, we select the VM and node details. For this example, we have created the pool using the Batch Explorer tool. After providing the basic details for the pool, we can select the VMs. For this example, we have created data science category VMs as they have pre-installed Python and other software.
To move the data, we need to develop a Python script to access blob storage, read the files, and store the data in an Azure MySQL database. The MySQL database will have two tables: one for metadata, which stores details about the flat files and their column mappings, and another for the destination table.
Here is an example of the Python code to read files, connect to the database, and add the data to the table:
# Import required packages
import os, uuid
import mysql.connector
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import json
import time, datetime
from requests import get, post
import pandas as pd
from io import StringIO
# Connect to Blob Storage
connect_str = "DefaultEndpointsProtocol=https;AccountName=<your Account name>;AccountKey <----->;EndpointSuffix=core.windows.net"
source_conainer_name = '<source container name>'
# Read blobs from Blob Storage
blob_list = blob_service_client.get_blob_client(container=source_conainer_name).list_blobs()
# Iterate through blobs
for blob in blob_list:
ini_string = blob.name
leng = ini_string.find('.')
sstring = ini_string[leng-4:leng]
# Get mapping source columns from the Database
mycursor.execute("SELECT group_concat(Columns_Name_Sheet) FROM Metadata where Vendor_Name=" + "'" + sstring + "'")
myresult = mycursor.fetchall()
newList = listToString(convertTuple(myresult))
my_list = newList.split(",")
# Download and read CSV file
blob_client = blob_service_client.get_blob_client(container=source_conainer_name, blob=blob.name)
data = blob_client.download_blob().readall()
s1 = str(data, 'utf-8')
# Temporary fix for newline in dataset
s = s1.replace('\r\n",', '",')
data2 = StringIO(s)
# Read CSV file into data frame
my_cols = [str(i) for i in range(71)]
df = pd.read_csv(data2, sep='[:,|_]', names=my_cols, engine='python', index_col=False)
headers = df.iloc[0]
new_df = pd.DataFrame(df.values[1:], columns=headers)
# Select columns from data frame as saved in MySQL
nd2 = new_df[my_list]
cols = "`,`".join([str(i) for i in nd2.columns.tolist()])
# Insert DataFrame records into MySQL
for i, row in nd2.iterrows():
sql = "INSERT INTO `patient_details` VALUES (" + "%s," * (len(row)-1) + "%s)"
cursorpy.execute(sql, tuple(row))
connectionPY.commit()
By using the above solution and modifying the code snippets as per your requirements, you can create a pipeline to dynamically load data into an Azure MySQL database using Azure Data Factory.
Remember to install the required Python packages mentioned in the code snippet before running the script.
With this approach, you can efficiently handle CSV files with different schemas and automate the process of loading data into an Azure MySQL database.