Manipulating the History Stream Folder Structure

Our Objective

Although the standard output of GRAX History Stream is standards compliant and works for most use cases, occasionally there is the need to change the structure of the folders. Since the files are all named the same within the History Stream folders, this means the files themselves need to be renamed.

About History Stream

GRAX History Stream lets you drive any type of downstream consumption of backed-up or archived data with your GRAX dataset. By designating a set of objects for automated continuous export to Parquet format, you can create a valuable data source for applications like AWS Glue and further data analytics tools.

Building in Python

Python has become the de facto programming language when it comes to data analytics, data science, and machine learning. However, before it gained prominence in these domains the language had developed a sizable user base as a good scripting language. This is because Python comes with “batteries included”, meaning that the standard install comes with a rich suite of libraries.

Our Use Case

When History Stream writes into its destination folder it uses a structure similar to:

└── org=00D8V000002YAz2UBA
    ├── object=Account
    │   ├── day=2023-08-23
    │   │   └── hr=14
    │   │       └── data-00000.parquet
    │   └── day=2023-10-31
    │       ├── hr=03
    │       │   └── data-00000.parquet
    │       └── hr=04
    │           └── data-00000.parquet
    └── object=Opportunity
        ├── day=2023-08-23
        │   ├── hr=14
        │   │   └── data-00000.parquet
        │   ├── hr=15
        │   │   └── data-00000.parquet
        │   └── hr=22
        │       └── data-00000.parquet
        └── day=2023-10-31
            ├── hr=03
            │   └── data-00000.parquet
            └── hr=04
                └── data-00000.parquet

We want to flatten it to a structure similar to:

└── 00D8V000002YAz2UBA
    ├── Account
    │   ├── 2023-08-23_14_data.parquet
    │   ├── 2023-10-31_03_data.parquet
    │   └── 2023-10-31_04_data.parquet
    └── Opportunity
        ├── 2023-08-23_14_data.parquet
        ├── 2023-08-23_15_data.parquet
        ├── 2023-08-23_22_data.parquet
        ├── 2023-10-31_03_data.parquet
        └── 2023-10-31_04_data.parquet

The Script

TL:DR; The following script will convert the folder structure from the first type to the second.

# Copyright (c) 20223 GRAX Inc.
# MIT License

import os
import shutil
# Define your base directory where all the date folders are located.
base_directory = '/Users/mmoran/python/parquet_rename/source/parquet'

# Define the directory where you want to save all the files.
destination_directory = '/Users/mmoran/python/parquet_rename/destination'

# Check if destination directory exists, if not, error out
if not os.path.exists(base_directory):
    print(f"Directory '{base_directory}' does not exist")

# Check if destination directory exists, if not, error out
if not os.path.exists(destination_directory):
    print(f"Directory '{destination_directory}' does not exist")

# Counter for metrics
counter = 0

# Walk through the directory structure. 
# This could be done using os.walk, but this is simpler to understand and modify
for org_folder in os.listdir(base_directory):
    org_folder_path = os.path.join(base_directory,org_folder)
    if os.path.isdir(org_folder_path):
        org_part = org_folder.split("=")[1] # Extract the portion of the org for the destination dir
        for object_folder in os.listdir(org_folder_path):
            object_folder_path = os.path.join(org_folder_path,object_folder)
            if os.path.isdir(object_folder_path):
                object_part = object_folder.split("=")[1] # Extract the portion of the object for the destination dir
                for date_folder in os.listdir(object_folder_path): 
                    date_folder_path = os.path.join(object_folder_path, date_folder) 
                    if os.path.isdir(date_folder_path):  
                        date_part = date_folder.split("=")[1] # Extract the portion of the date for the destination dir
                        for hour_folder in os.listdir(date_folder_path):
                            hour_folder_path = os.path.join(date_folder_path, hour_folder) 
                            if os.path.isdir(hour_folder_path):  
                                hour_part = hour_folder.split("=")[1] # Extract the portion of the hour for the destination dir
                                #The following line counts the number of parquet files in the folder and excludes the non-parquet in the count
                                parquet_file_count_total = len([f for f in os.listdir(hour_folder_path) if os.path.splitext(f)[1] == '.parquet'])
                                for file_name in os.listdir(hour_folder_path):  #There is usually only one file, but that is not guaranteed 
                                    if os.path.splitext(file_name)[1] != ".parquet":
                                        print(f"File {file_name} does not have a .parquet extension and will be skipped")
                                    file_path = os.path.join(hour_folder_path, file_name)
                                    if os.path.isfile(file_path):
                                        # if there are multiple parquet files use the full filename, otherwise ~simplify~
                                        if parquet_file_count_total > 1:
                                            file_part = file_name
                                            file_part = "data.parquet"
                                        new_file_name = f"{date_part}_{hour_part}_{file_part}"
                                        new_folder_path = os.path.join(destination_directory,org_part,object_part)
                                        new_file_path = os.path.join(new_folder_path, new_file_name)
                                        os.makedirs(new_folder_path, exist_ok=True)
                                        shutil.copyfile(file_path, new_file_path)

print(f"{counter} files have been copied and renamed to {destination_directory}.")

What’s happening here?

The script has been written to be easy to understand and modify. It iterates through the directory structure created by History Steam to flatten out the structure. Each For loop iterates through one layer of the tree. It validates that the object it’s iterating over is a directory and then descends further down the tree. As it’s doing that it also grabs the portion of the folder name for the destination structure.

Once we get down to the hour folder (the deepest part of the structure), we count how many .parquet files are in the folder. For most cases, it’s expected that there is one file per folder, but if there is a mass change, it could be more. To simplify things, we don’t want to include a bunch of zeros (00000) on every file if there is only one. We also want to exclude any non-parquet files (looking at you .DS_Store) but not have them counted in the total.

parquet_file_count_total = len([f for f in os.listdir(hour_folder_path) if os.path.splitext(f)[1] == '.parquet'])

The above looks a little tricky, but it’s actually a straightforward list comprehension. If we read through it logically:

  • Count the elements in the following:
    • Create a list of the objects
      • in the hour_folder_path AND
      • that have an extension of .parquet

Next Steps

This script works if the source and destination are within a standard directory structure – but we know the source and destination are often in either an AWS S3 bucket or Azure blob storage. This could be handled by either modifying the script to access the storage (using the appropriate library) or by copying it to a folder location before and after the script.

Ready to try History Stream?

Try GRAX for free
See all

Join the best
with GRAX Enterprise.

Be among the smartest companies in the world.