Build a Pipeline for transferring data from S3 to Redshift with Apache Airflow

Ahmed Mokbel
6 min readJul 5, 2024

--

Apache Airflow, a powerful workflow orchestration tool, simplifies the management of complex data pipelines. In this article, we will guide you through the process of building a data pipeline that transfers data from a data source to Amazon S3, and then to Amazon Redshift using Apache Airflow.

We will break down the pipeline into three phases:

  • Loading Data from the Source
  • Transferring Data to S3
  • Transferring Data from S3 to Redshift:

Phase 1: Load Data from Source

The first phase involves extracting data from the source system. This could be a database, an API, or any other data source. We’ll use Airflow Python operators to fetch the data and prepare it for transfer to S3.

def customers(conn_str: str, ds: str, ds_nodash: str, dag_id: str, **kwargs):
df = conn_str.get_pandas_df(
f"""
with transformed_customers (
select
id ,
creation_date,
customer_fullname,
customer_city ,
customer_address
from customers)
select * from transformed_customers
where creation_date ='{ds}'
;"""
)

schema = pa.schema(
[
pa.field("id", pa.string()),
pa.field("creation_date", pa.date32()),
pa.field("customer_fullname", pa.string()),
pa.field("customer_city", pa.string()),
pa.field("customer_address", pa.string()),
]
)

print("Size Of DataFrame----->", df.shape)
file_dir = path_files.format(dag_id, f"customers_{dsnodash}.pqt")
df.to_parquet(file_dir, schema=schema, index=False)
kwargs["ti"].xcom_push(key=dag_id, value=file_dir)
def branch(conn_str: str, dag_id: str, **kwargs):
df = conn_str.get_pandas_df(
f"""
with transformed_branch (
select
id ,
creation_date,
branch_name,
branch_city ,
branch_code
from branch)
select * from transformed_branch
;"""
)

schema = pa.schema(
[
pa.field("id", pa.string()),
pa.field("creation_date", pa.date32()),
pa.field("branch_name", pa.string()),
pa.field("branch_city", pa.string()),
pa.field("branch_code", pa.string()),
]
)

print("Size Of DataFrame----->", df.shape)
file_dir = path_files.format(dag_id, f"branch_{dsnodash}.pqt")
df.to_parquet(file_dir, schema=schema, index=False)
kwargs["ti"].xcom_push(key=dag_id, value=file_dir)

Extracting Data from Two Different Tables: Customers and Branches

In this section, we will explain how to extract data from two different tables, customers and branches, using Apache Airflow. We'll walk through the customers function parameters:

  • conn_str: A connection string or object used to connect to a database.
  • ds: A template field in Airflow that provides the DAG run's logical date in the format YYYY-MM-DD. This is used in the SQL query to get a snapshot of the customers table for the current date minus one day.
  • ds_nodash: A template field in Airflow that provides the DAG run's logical date without dashes (YYYYMMDD). This is used to prepare the file name for the snapshot.
  • dag_id: A template field in Airflow that provides the ID of the DAG. This is used to create a dynamic folder with the same name as the DAG ID to save the DataFrame as a Parquet file.

Body:

Schema Definition: A PyArrow schema is defined to match DataFrame columns with corresponding Redshift data types, ensuring data quality and compatibility.

Save DataFrame to Parquet: The DataFrame size is printed for debugging. The DataFrame is saved as a Parquet file at a specified location with the defined schema.

Push File Path to XCom: The file path is pushed to Airflow’s XCom for use by other tasks in the DAG.

We didn’t use ds in the branch query because the customers table is append-only, where new records are added daily, whereas the branch table is completely replaced each day to reflect the most up-to-date data."

Phase 2: transferring data to S3

In the second phase, we transfer the extracted data to Amazon S3 for storage. This involves uploading the locally saved Parquet file to an S3 bucket using LocalFilesystemToS3Operator

If we have multiple tables, such as two in this example, we need separate operators for each one, like LocalFilesystemToS3Operator for customers and another for branch. However, if we have more tables, such as five or more, writing individual operators for each table would be inefficient. Instead, we can create a function that generates the necessary profile for each table and use a loop to apply the same operator for all tables. This approach streamlines the process and scales better for managing multiple tables.

def generate_tables_profile():

tables_profile = {
"snp_customers": {
"upsert_keys": ['id'],
"replace": False ,
"function": customers,
"params": {"conn_str": PostgresHook("pos_conn"),
"dag_id": "{{dag.dag_id}}"
},
} ,

"snp_branch": {
"upsert_keys": None,
"replace": True,
"function": branch,
"params": {"conn_str": mysqlhook("mysql_conn"),
"dag_id": "{{dag.dag_id}}"
},
}
}

return tables_profile

This function generates profiles for each table, which likely include details like the table name, associated functions, parameters, and whether the data should be replaced in S3.

default_args = {
"owner": "Ahmed",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}

with DAG(
's3_to_redshift_pipeline',
default_args=default_args,
description='A simple data pipeline to transfer data from S3 to Redshift',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:

tables_profile= generate_tables_profile()
for table_name, profile in tables_profile.items():

with TaskGroup(group_id=f"{table_name}_group") as tg:
read_from_db_task = PythonOperator(
task_id=f"read_from_{table_name}_task",
python_callable=profile["function"],
op_kwargs=profile["params"],
provide_context=True,
on_failure_callback=slack_failed_callback,
dag=dag,
)

upload_to_s3_operator_task = LocalFilesystemToS3Operator(
task_id=f"write_{table_name}_To_S3",
filename="{{ ti.xcom_pull(task_ids=params.task, key=dag.dag_id) }}",
dest_key=dest_key_folder(
table_name, profile["replace"]
),
dest_bucket="datamart_pod" ,
aws_conn_id="AWS_CONN_S3_TO_REDSHIFT",
replace=profile["replace"],
params={
"task": f"{table_name}_group.read_from_{table_name}_task",
},
on_failure_callback=slack_failed_callback,
dag=dag,
)

let’s explain LocalFilesystemToS3Operator
. task_id: Unique identifier for the task.

  • filename: Retrieves the filename from XCom, where the previous task stored it.
  • dest_key: Destination key (path) in the S3 bucket.

The dest_key_folder function generates an S3 destination key based on the replace_flag for the table. If replace_flag is False, the path includes year, month, and day to structure the files by date like this : datamart_pod/customers/2024/06/03/customers_20240603.pqt.

If replace_flag is True, the path is simpler and does not include the date, following the format bucket_name/folder_name/file.

def dest_key_folder(table:str, replace_flag:bool):
if replace_flag:
return "{}/{}/{}".format(
'datamart-pod',
table,
"{{ ti.xcom_pull(task_ids=params.task, key=dag.dag_id).split('/')[-1] }}",
)
else:
return "{}/{}/{}/{}/{}/{}".format(
'datamart-pod',
table,
"{{ ds.split('-')[0] }}",
"{{ ds.split('-')[1] }}",
"{{ ds.split('-')[2] }}",
"{{ ti.xcom_pull(task_ids=params.task, key=dag.dag_id).split('/')[-1] }}",
)
  • dest_bucket: The S3 bucket name.
  • aws_conn_id: AWS connection ID configured in Airflow.
  • replace: Indicates whether to replace the file in S3 if it already exists.
  • params: Parameters passed to the task, including the task ID for the previous task.

Phase 3: transferring data from S3 to Redshift

In the final phase, we transfer the data stored in Amazon S3 to Amazon Redshift for further analysis and processing. This involves using Airflow’s S3ToRedshiftOperator to copy the data efficiently from S3 to Redshift.

inside the loop, we will put this operator

transfer_to_redshift_task = S3ToRedshiftOperator(
task_id=f"transfer_{table_name}_to_redshift",
redshift_conn_id="Redshift_dm_conn",
s3_bucket="datamart_pod" ,
s3_key=s3_key_template(profile["replace"]),
schema="eng" ,
aws_conn_id="AWS_CONN_S3_TO_REDSHIFT",
table=f"{table_name}",
copy_options=["FORMAT AS PARQUET", "FILLRECORD"],
method=rebuild_method(profile["replace"]),
upsert_keys=profile["upsert_keys"],
params={
"task": f"{table_name}_group.read_from_{table_name}_task",
"table": table_name,
},
on_failure_callback=slack_failed_callback,
dag=dag,
)

read_from_db_task >> upload_to_s3_operator_task >> transfer_to_redshift_task

so how does it work?

Parameters

  • redshift_conn_id: The Airflow connection ID for Redshift.
  • s3_bucket: The name of the S3 bucket.
  • s3_key: The S3 key (path) to the data file. It is generated using the s3_key_template function which uses the replace flag from the profile to determine the key structure.
  • schema: The Redshift schema where the table resides.
  • aws_conn_id: The Airflow connection ID for AWS credentials.
  • table: The name of the Redshift table where the data will be loaded. It uses the table name from the profile.
  • copy_options: A list of options for the Redshift COPY command. In this case, it includes options to format the data as Parquet and to fill in missing records (e.g., ["FORMAT AS PARQUET", "FILLRECORD"]).
  • method: The method used for loading data, determined by the rebuild_method function using the replace flag from the profile. This could specify whether to append or replace data in the Redshift table.
def rebuild_method(replace_flag, **kwargs):
# Initialize Method with a default value
Method = "UPSERT" # Default method

if replace_flag == True:
Method = "REPLACE"
else:
dag_run = kwargs.get("dag_run")
dag_run_conf = dag_run.conf
if dag_run_conf and dag_run_conf.get("rebuild_mode") == "full":
Method = "REPLACE"
# Else, Method remains 'UPSERT'
logging.info("Method Rebuild----------->", Method)
return Method
  • upsert_keys: The primary key(s) for the table, is used for upserting but in this operator follow this approach (delete and inserting) data.

Conclusion

In this article, we walked through the process of building a data pipeline using Apache Airflow to transfer data from a data source to Amazon S3 and then to Amazon Redshift. We divided the pipeline into three phases: loading data from the source, transferring data to S3, and transferring data from S3 to Redshift.

I hope you enjoyed it!

--

--

Ahmed Mokbel
Ahmed Mokbel

No responses yet