4 min read

ETL - Pipeline with join2

1 Introduction

Let us come to another variant of ETL. The “last time” I prepared two data sets and then merged them. Now I will again load two records, edit one of them to make a join possible, merge the records and edit them further. Finally I want to save the final dataset.

Overview of the ETL steps:

At the end I will test the created ETL. I will show this from another jupyter notebook and a fully automatic call from the command line.

For this post I use two specially created sample data sets. A copy of them is stored in my “GitHub Repo”.

2 Setup

There is not much to add to the setup. In the two previous posts I have already explained it in detail.

3 ETL Pipeline with join2

import pandas as pd

3.1 Extract

countries = pd.read_csv('../data/input/Countries.csv')
countries

countries_metadata = pd.read_csv('../data/input/Countries_metadata.csv')
countries_metadata

3.2 Transform

3.2.1 Joining

df = pd.merge(countries, countries_metadata, left_on='Countries', right_on='country_names', how='left')
df

As we can see, the problem here is that we do not have a uniform name for the countries.

3.2.2 pre-process countries_metadata for joining

countries_metadata.country_names = countries_metadata.country_names.map(lambda x: x.split('.')[1])
countries_metadata

df = pd.merge(countries, countries_metadata, left_on='Countries', right_on='country_names', how='left')
df

3.2.3 Cleaning

df = df.drop(['country_names'], axis=1)
df

3.2.4 Add further calculations

df['pop_density'] = df['Population']/df['Land_Area']
df.head()

df['Population'] = df['Population']/1000
df['Land_Area'] = df['Land_Area']/1000

df.head()

df = df.rename(columns={'Population':'Population_per_k', 'Land_Area':'Land_Area_per_k'})
df

3.3 Load

df.to_csv('../data/output/new_df.csv')

4 Create etl_pipeline.py

import pandas as pd
import numpy as np
import os


class DataPreprocessor:
    def __init__(self, path_folder = "path/to/data"):

        self.path_folder = path_folder
        
        # Path to input
        self.path_input_folder = "{}/input/".format(path_folder)
        self.path_input_countries = self.path_input_folder + 'Countries.csv'
        self.path_input_countries_metadata = self.path_input_folder + 'Countries_metadata.csv'

        # Path on which output tables are saved
        self.path_output_folder = "{}/output/".format(path_folder)
        self.path_output_countries = self.path_output_folder + 'Countries.csv'
        self.path_output_countries_metadata = self.path_output_folder + 'Countries_metadata.csv'
        self.path_output_new_df = self.path_output_folder + 'new_df.csv'

        # create dictionaries for read dtypes
        self.read_dtypes_countries = {'Countries':'category'}
        self.read_dtypes_countries_metadata = {'country_names':'category'}

        # create folders for output if not existent yet
        if not os.path.exists(self.path_output_folder):
            os.makedirs(self.path_output_folder) 

    def read_data_from_raw_input(self):

        print("Start:\tRead in countries Dataset")
        self.countries = pd.read_csv(self.path_input_countries, dtype=self.read_dtypes_countries)
        print("Finish:\tRead in countries Dataset")

        print("Start:\tRead in countries_metadata Dataset")       
        self.countries_metadata = pd.read_csv(self.path_input_countries_metadata, dtype=self.read_dtypes_countries_metadata)
        print("Finish:\tRead in countries_metadata Dataset")

    def preprocess_data(self, save_preprocess_countries=False, save_preprocess_countries_metadata=False, save_preprocess_new_df=True):

        print("Start:\tPreprocessing countries_metadata Dataset")
        self.preprocess_countries_metadata()
        print("Finish:\tPreprocessing countries_metadata Dataset")

        self.new_df = pd.merge(self.countries, self.countries_metadata, left_on='Countries', right_on='country_names', how='left')

        self.preprocess_new_df()

#        print("Start:\tPreprocessing countries Dataset")
#        self.preprocess_countries()
#        print("Finish:\tPreprocessing countries Dataset")

        if save_preprocess_countries:
            print("Start:\tSave countries Dataset to disc")
            self.countries.to_csv(self.path_output_countries, index=False)
            print("Finish:\tSave countries Dataset to disc")

        if save_preprocess_countries_metadata:
            print("Start:\tSave countries_metadata Dataset to disc")
            self.countries_metadata.to_csv(self.path_output_countries_metadata, index=False)
            print("Finish:\tSave countries_metadata Dataset to disc")

        if save_preprocess_new_df:
            print("Start:\tSave new_df Dataset to disc")
            self.new_df.to_csv(self.path_output_new_df, index=False)
            print("Finish:\tSave new_df Dataset to disc")

        return self.countries, self.countries_metadata, self.new_df

    def preprocess_countries_metadata(self):
        
        self.countries_metadata.country_names = self.countries_metadata.country_names.map(lambda x: x.split('.')[1])
        self.new_df['pop_density'] = self.new_df['Population']/self.new_df['Land_Area']
        self.new_df['Population'] = self.new_df['Population']/1000
        self.new_df['Land_Area'] = self.new_df['Land_Area']/1000
        self.new_df = self.new_df.rename(columns={'Population': 'Population_per_k', 'Land_Area': 'Land_Area_per_k'})

    def preprocess_new_df(self):
        
        self.new_df = self.new_df.drop(['country_names'], axis=1)

        
    def read_preprocessed_tables(self):
        
        print("Start:\tRead in modified Dataset")
        self.new_df = pd.read_csv(self.path_output_new_df, dtype=self.read_dtypes_new_df)
        print("Finish:\tRead in modified Dataset")

        return self.new_df
             

def main():

    datapreprocesssor = DataPreprocessor()
    datapreprocesssor.read_data_from_raw_input()
    datapreprocesssor.preprocess_data()
    print('ETL has been successfully completed !!')

#if __name__ == '__main__':
#    main()

We have commented out the main from the ETL pipeline here with ‘#’. Of course, this syntax must not be commented out in the .py file.

We have removed the part self.preprocess_countries(), because it is not used here. We continue to work with the new_df after the join. The option to save the loaded file countries.csv, in this case, as we have loaded it, remains, because you might want to have all relevant records in the output folder.

Unfortunately the view is not optimal. Therefore I recommend to copy the syntax into a .py file. I prefer “Visual Studio Code” from Microsoft. But I also put the file in my “GitHub Repo” from where you can get it.

5 Test etl_pipeline.py

Now we want to test our created ETL.

5.1 from jupyter notebook

First I want to test the ETL from a notebook. For this we create and start a new notebook in the notebooks-folder with the name ‘Test ETL Pipeline with join2.ipynb’.

import sys

# Specifies the file path where the first .py file is located.
sys.path.insert(1, '../data')
import etl_pipeline
datapreprocessor = etl_pipeline.DataPreprocessor()
datapreprocessor.read_data_from_raw_input()

new_df = datapreprocessor.preprocess_data(save_preprocess_new_df=True)

new_df = datapreprocessor.read_preprocessed_tables()

new_df

5.2 from command line

Because we have defined a main in the .py file we are able to call and execute the ETL from the command line. It does not matter which command line this is exactly. You can use Anaconda Power Shell, Git Bash, the Windwos Comman Line or anything else.

Type only the following commands in your command prompt:

cd "path/to/your/data/folder"
python etl_pipeline.py

6 Conclusion

In this kind of ETL I showed how to load and edit one or more data sets, then merge them, edit them further and finally save them.