Our Objective
Although the standard output of GRAX Data Lake is standards compliant and works for most use cases, occasionally there is the need to change the structure of the folders. Since the files are all named the same within the History Stream folders, this means the files themselves need to be renamed.
About Data Lake
GRAX Data Lake lets you drive any type of downstream consumption of backed-up or archived data with your GRAX dataset. By designating a set of objects for automated continuous export to Parquet format, you can create a valuable data source for applications like AWS Glue and further data analytics tools.
Building in Python
Python has become the de facto programming language when it comes to data analytics, data science, and machine learning. However, before it gained prominence in these domains the language had developed a sizable user base as a good scripting language. This is because Python comes with “batteries included”, meaning that the standard install comes with a rich suite of libraries.
Our Use Case
When History Stream writes into its destination folder it uses a structure similar to:
v2
└── org=00D8V000002YAz2UBA
├── object=Account
│ ├── batch=0664f2fc0
│ │ └── data-18fa5ba7497.parquet
│ └── batch=0665e04a0
│ └── data-18fa5ba7534.parquet
│ └── data-18fa57a5e48.parquet
└── object=Opportunity
├── batch=0664f2fc0
│ └── data-18fa5812a45.parquet
│ └── data-18fa582769a.parquet
│ └── data-18fa58423a2.parquet
└── batch=0665e04a0
└── data-18fa4ad3449.parquet
└── data-18fa6ad3292.parquet
We want to flatten it to a structure similar to:
.
└── 00D8V000002YAz2UBA
├── Account
│ ├── account-0664f2fc0-18fa5ba7497.parquet
│ ├── account-0665e04a0-18fa5ba7534.parquet
│ └── account-0665e04a0-18fa57a5e48.parquet
└── Opportunity
├── opportunity-0664f2fc0-18fa5812a45.parquet
├── opportunity-0664f2fc0-18fa582769a.parquet
├── opportunity-0664f2fc0-18fa58423a2.parquet
├── opportunity-0665e04a0-18fa4ad3449.parquet
└── opportunity-0665e04a0-18fa6ad3292.parquet
The Script
TL:DR; The following script will convert the folder structure from the first type to the second.
# parquet_rename.py
# Copyright (c) 2024 GRAX Inc.
# MIT License
# This is the V2 version of the parquet_rename.py script.
# This script works with the V2 version of Data Lake.
import os
import shutil
# Define your base directory where all the date folders are located.
base_directory = '/Users/mmoran/python/parquet_rename/source/v2'
# Define the directory where you want to save all the files.
destination_directory = '/Users/mmoran/python/parquet_rename/destination'
# Check if destination directory exists, if not, error out
if not os.path.exists(base_directory):
print(f"Directory '{base_directory}' does not exist")
exit()
# Check if destination directory exists, if not, error out
if not os.path.exists(destination_directory):
print(f"Directory '{destination_directory}' does not exist")
exit()
# Counter for metrics
counter = 0
# Walk through the directory structure.
# This could be done using os.walk, but this is simpler to understand and modify
for org_folder in os.listdir(base_directory):
org_folder_path = os.path.join(base_directory,org_folder)
if os.path.isdir(org_folder_path):
org_part = org_folder.split("=")[1] # Extract the portion of the org for the destination dir
for object_folder in os.listdir(org_folder_path):
object_folder_path = os.path.join(org_folder_path,object_folder)
if os.path.isdir(object_folder_path):
object_part = object_folder.split("=")[1] # Extract the portion of the object for the destination dir
for batch_folder in os.listdir(object_folder_path):
batch_folder_path = os.path.join(object_folder_path, batch_folder)
if os.path.isdir(batch_folder_path):
batch_part = batch_folder.split("=")[1] # Extract the portion of the batch for the destination dir
#The following line counts the number of parquet files in the folder and excludes the non-parquet in the count
parquet_file_count_total = len([f for f in os.listdir(batch_folder_path) if os.path.splitext(f)[1] == '.parquet'])
for file_name in os.listdir(batch_folder_path): #There is usually only one file, but that is not guaranteed
split_file_name = os.path.splitext(file_name)
id_part = split_file_name[0].split("-")[1] # Extract the portion of the id for the destination file
if split_file_name[1] != ".parquet":
print(f"File {file_name} does not have a .parquet extension and will be skipped")
continue
file_path = os.path.join(batch_folder_path, file_name)
if os.path.isfile(file_path):
#Build the new file, adjust the id and batch so a sort by name will be in order
new_file_name = f"{object_part.lower()}-{batch_part}-{id_part}.parquet"
new_folder_path = os.path.join(destination_directory,org_part,object_part)
new_file_path = os.path.join(new_folder_path, new_file_name)
print(new_file_path)
os.makedirs(new_folder_path, exist_ok=True)
shutil.copyfile(file_path, new_file_path)
counter+=1
print(f"{counter} files have been copied and renamed to {destination_directory}.")
What’s happening here?
The script has been written to be easy to understand and modify. It iterates through the directory structure created by Data Lake to flatten out the structure. Each For loop iterates through one layer of the tree. It validates that the object it’s iterating over is a directory and then descends further down the tree. As it’s doing that it also grabs the portion of the folder name for the destination structure.
Once we get down to the batch folder (the deepest part of the structure), we count how many .parquet files are in the folder. It should always be one or more. We also want to exclude any non-parquet files (looking at you .DS_Store) but not have them counted in the total.
if split_file_name[1] != ".parquet":
print(f"File {file_name} does not have a .parquet extension and will be skipped")
continue
The above is a simple check to make sure the extension is a .parquet. We are just testing to the extension, if the files do not have extension names (or are named incorrectly you will need to modify this section.
We then assemble the new filename. We rearrange the structure where the batch id comes first than the file id. This is so when the files are sorted by name they will be in order from oldest to newest.
new_file_name = f"{object_part.lower()}-{batch_part}-{id_part}.parquet"
Next Steps
This script works if the source and destination are within a standard directory structure – but we know the source and destination are often in either an AWS S3 bucket or Azure blob storage. This could be handled by either modifying the script to access the storage (using the appropriate library) or by copying it to a folder location before and after the script.