Our Objective
Although the standard output of GRAX History Stream is standards compliant and works for most use cases, occasionally there is the need to change the structure of the folders. Since the files are all named the same within the History Stream folders, this means the files themselves need to be renamed.
About History Stream
GRAX History Stream lets you drive any type of downstream consumption of backed-up or archived data with your GRAX dataset. By designating a set of objects for automated continuous export to Parquet format, you can create a valuable data source for applications like AWS Glue and further data analytics tools.
Building in Python
Python has become the de facto programming language when it comes to data analytics, data science, and machine learning. However, before it gained prominence in these domains the language had developed a sizable user base as a good scripting language. This is because Python comes with “batteries included”, meaning that the standard install comes with a rich suite of libraries.
Our Use Case
When History Stream writes into its destination folder it uses a structure similar to:
parquet
└── org=00D8V000002YAz2UBA
├── object=Account
│ ├── day=2023-08-23
│ │ └── hr=14
│ │ └── data-00000.parquet
│ └── day=2023-10-31
│ ├── hr=03
│ │ └── data-00000.parquet
│ └── hr=04
│ └── data-00000.parquet
└── object=Opportunity
├── day=2023-08-23
│ ├── hr=14
│ │ └── data-00000.parquet
│ ├── hr=15
│ │ └── data-00000.parquet
│ └── hr=22
│ └── data-00000.parquet
└── day=2023-10-31
├── hr=03
│ └── data-00000.parquet
└── hr=04
└── data-00000.parquet
We want to flatten it to a structure similar to:
.
└── 00D8V000002YAz2UBA
├── Account
│ ├── 2023-08-23_14_data.parquet
│ ├── 2023-10-31_03_data.parquet
│ └── 2023-10-31_04_data.parquet
└── Opportunity
├── 2023-08-23_14_data.parquet
├── 2023-08-23_15_data.parquet
├── 2023-08-23_22_data.parquet
├── 2023-10-31_03_data.parquet
└── 2023-10-31_04_data.parquet
The Script
TL:DR; The following script will convert the folder structure from the first type to the second.
# parquet_rename.py
# Copyright (c) 20223 GRAX Inc.
# MIT License
import os
import shutil
# Define your base directory where all the date folders are located.
base_directory = '/Users/mmoran/python/parquet_rename/source/parquet'
# Define the directory where you want to save all the files.
destination_directory = '/Users/mmoran/python/parquet_rename/destination'
# Check if destination directory exists, if not, error out
if not os.path.exists(base_directory):
print(f"Directory '{base_directory}' does not exist")
exit()
# Check if destination directory exists, if not, error out
if not os.path.exists(destination_directory):
print(f"Directory '{destination_directory}' does not exist")
exit()
# Counter for metrics
counter = 0
# Walk through the directory structure.
# This could be done using os.walk, but this is simpler to understand and modify
for org_folder in os.listdir(base_directory):
org_folder_path = os.path.join(base_directory,org_folder)
if os.path.isdir(org_folder_path):
org_part = org_folder.split("=")[1] # Extract the portion of the org for the destination dir
for object_folder in os.listdir(org_folder_path):
object_folder_path = os.path.join(org_folder_path,object_folder)
if os.path.isdir(object_folder_path):
object_part = object_folder.split("=")[1] # Extract the portion of the object for the destination dir
for date_folder in os.listdir(object_folder_path):
date_folder_path = os.path.join(object_folder_path, date_folder)
if os.path.isdir(date_folder_path):
date_part = date_folder.split("=")[1] # Extract the portion of the date for the destination dir
for hour_folder in os.listdir(date_folder_path):
hour_folder_path = os.path.join(date_folder_path, hour_folder)
if os.path.isdir(hour_folder_path):
hour_part = hour_folder.split("=")[1] # Extract the portion of the hour for the destination dir
#The following line counts the number of parquet files in the folder and excludes the non-parquet in the count
parquet_file_count_total = len([f for f in os.listdir(hour_folder_path) if os.path.splitext(f)[1] == '.parquet'])
for file_name in os.listdir(hour_folder_path): #There is usually only one file, but that is not guaranteed
if os.path.splitext(file_name)[1] != ".parquet":
print(f"File {file_name} does not have a .parquet extension and will be skipped")
continue
file_path = os.path.join(hour_folder_path, file_name)
if os.path.isfile(file_path):
# if there are multiple parquet files use the full filename, otherwise ~simplify~
if parquet_file_count_total > 1:
file_part = file_name
else:
file_part = "data.parquet"
new_file_name = f"{date_part}_{hour_part}_{file_part}"
new_folder_path = os.path.join(destination_directory,org_part,object_part)
new_file_path = os.path.join(new_folder_path, new_file_name)
print(new_file_path)
os.makedirs(new_folder_path, exist_ok=True)
shutil.copyfile(file_path, new_file_path)
counter+=1
print(f"{counter} files have been copied and renamed to {destination_directory}.")
What’s happening here?
The script has been written to be easy to understand and modify. It iterates through the directory structure created by History Steam to flatten out the structure. Each For loop iterates through one layer of the tree. It validates that the object it’s iterating over is a directory and then descends further down the tree. As it’s doing that it also grabs the portion of the folder name for the destination structure.
Once we get down to the hour folder (the deepest part of the structure), we count how many .parquet files are in the folder. For most cases, it’s expected that there is one file per folder, but if there is a mass change, it could be more. To simplify things, we don’t want to include a bunch of zeros (00000) on every file if there is only one. We also want to exclude any non-parquet files (looking at you .DS_Store) but not have them counted in the total.
parquet_file_count_total = len([f for f in os.listdir(hour_folder_path) if os.path.splitext(f)[1] == '.parquet'])
The above looks a little tricky, but it’s actually a straightforward list comprehension. If we read through it logically:
- Count the elements in the following:
- Create a list of the objects
- in the hour_folder_path AND
- that have an extension of .parquet
- Create a list of the objects
Next Steps
This script works if the source and destination are within a standard directory structure – but we know the source and destination are often in either an AWS S3 bucket or Azure blob storage. This could be handled by either modifying the script to access the storage (using the appropriate library) or by copying it to a folder location before and after the script.