Build a Pipeline for transferring data from S3 to Redshift with Apache Airflow
Apache Airflow, a powerful workflow orchestration tool, simplifies the management of complex data pipelines. In this article, we will guide you through the process of building a data pipeline that transfers data from a data source to Amazon S3, and then to Amazon Redshift using Apache Airflow.
We will break down the pipeline into three phases:
- Loading Data from the Source
- Transferring Data to S3
- Transferring Data from S3 to Redshift:
Phase 1: Load Data from Source
The first phase involves extracting data from the source system. This could be a database, an API, or any other data source. We’ll use Airflow Python operators to fetch the data and prepare it for transfer to S3.
def customers(conn_str: str, ds: str, ds_nodash: str, dag_id: str, **kwargs):
df = conn_str.get_pandas_df(
f"""
with transformed_customers (
select
id ,
creation_date,
customer_fullname,
customer_city ,
customer_address
from customers)
select * from transformed_customers
where creation_date ='{ds}'
;"""
)
schema = pa.schema(
[
pa.field("id", pa.string()),
pa.field("creation_date", pa.date32()),
pa.field("customer_fullname", pa.string()),
pa.field("customer_city", pa.string()),
pa.field("customer_address", pa.string()),
]
)
print("Size Of DataFrame----->", df.shape)
file_dir = path_files.format(dag_id, f"customers_{dsnodash}.pqt")
df.to_parquet(file_dir, schema=schema, index=False)
kwargs["ti"].xcom_push(key=dag_id, value=file_dir)
def branch(conn_str: str, dag_id: str, **kwargs):
df = conn_str.get_pandas_df(
f"""
with transformed_branch (
select
id ,
creation_date,
branch_name,
branch_city ,
branch_code
from branch)
select * from transformed_branch
;"""
)
schema = pa.schema(
[
pa.field("id", pa.string()),
pa.field("creation_date", pa.date32()),
pa.field("branch_name", pa.string()),
pa.field("branch_city", pa.string()),
pa.field("branch_code", pa.string()),
]
)
print("Size Of DataFrame----->", df.shape)
file_dir = path_files.format(dag_id, f"branch_{dsnodash}.pqt")
df.to_parquet(file_dir, schema=schema, index=False)
kwargs["ti"].xcom_push(key=dag_id, value=file_dir)
Extracting Data from Two Different Tables: Customers and Branches
In this section, we will explain how to extract data from two different tables,
customers
andbranches
, using Apache Airflow. We'll walk through thecustomers
function parameters:
conn_str
: A connection string or object used to connect to a database.ds
: A template field in Airflow that provides the DAG run's logical date in the formatYYYY-MM-DD
. This is used in the SQL query to get a snapshot of thecustomers
table for the current date minus one day.ds_nodash
: A template field in Airflow that provides the DAG run's logical date without dashes (YYYYMMDD
). This is used to prepare the file name for the snapshot.dag_id
: A template field in Airflow that provides the ID of the DAG. This is used to create a dynamic folder with the same name as the DAG ID to save the DataFrame as a Parquet file.
Body:
Schema Definition: A PyArrow schema is defined to match DataFrame columns with corresponding Redshift data types, ensuring data quality and compatibility.
Save DataFrame to Parquet: The DataFrame size is printed for debugging. The DataFrame is saved as a Parquet file at a specified location with the defined schema.
Push File Path to XCom: The file path is pushed to Airflow’s XCom for use by other tasks in the DAG.
We didn’t use
ds
in thebranch
query because thecustomers
table is append-only, where new records are added daily, whereas thebranch
table is completely replaced each day to reflect the most up-to-date data."
Phase 2: transferring data to S3
In the second phase, we transfer the extracted data to Amazon S3 for storage. This involves uploading the locally saved Parquet file to an S3 bucket using LocalFilesystemToS3Operator
If we have multiple tables, such as two in this example, we need separate operators for each one, like LocalFilesystemToS3Operator
for customers
and another for branch
. However, if we have more tables, such as five or more, writing individual operators for each table would be inefficient. Instead, we can create a function that generates the necessary profile for each table and use a loop to apply the same operator for all tables. This approach streamlines the process and scales better for managing multiple tables.
def generate_tables_profile():
tables_profile = {
"snp_customers": {
"upsert_keys": ['id'],
"replace": False ,
"function": customers,
"params": {"conn_str": PostgresHook("pos_conn"),
"dag_id": "{{dag.dag_id}}"
},
} ,
"snp_branch": {
"upsert_keys": None,
"replace": True,
"function": branch,
"params": {"conn_str": mysqlhook("mysql_conn"),
"dag_id": "{{dag.dag_id}}"
},
}
}
return tables_profile
This function generates profiles for each table, which likely include details like the table name, associated functions, parameters, and whether the data should be replaced in S3.
default_args = {
"owner": "Ahmed",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
with DAG(
's3_to_redshift_pipeline',
default_args=default_args,
description='A simple data pipeline to transfer data from S3 to Redshift',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
tables_profile= generate_tables_profile()
for table_name, profile in tables_profile.items():
with TaskGroup(group_id=f"{table_name}_group") as tg:
read_from_db_task = PythonOperator(
task_id=f"read_from_{table_name}_task",
python_callable=profile["function"],
op_kwargs=profile["params"],
provide_context=True,
on_failure_callback=slack_failed_callback,
dag=dag,
)
upload_to_s3_operator_task = LocalFilesystemToS3Operator(
task_id=f"write_{table_name}_To_S3",
filename="{{ ti.xcom_pull(task_ids=params.task, key=dag.dag_id) }}",
dest_key=dest_key_folder(
table_name, profile["replace"]
),
dest_bucket="datamart_pod" ,
aws_conn_id="AWS_CONN_S3_TO_REDSHIFT",
replace=profile["replace"],
params={
"task": f"{table_name}_group.read_from_{table_name}_task",
},
on_failure_callback=slack_failed_callback,
dag=dag,
)
let’s explain LocalFilesystemToS3Operator
. task_id: Unique identifier for the task.
- filename: Retrieves the filename from XCom, where the previous task stored it.
- dest_key: Destination key (path) in the S3 bucket.
The
dest_key_folder
function generates an S3 destination key based on thereplace_flag
for the table. Ifreplace_flag
isFalse
, the path includes year, month, and day to structure the files by date like this :datamart_pod/customers/2024/06/03/customers_20240603.pqt
.If
replace_flag
isTrue
, the path is simpler and does not include the date, following the formatbucket_name/folder_name/file
.
def dest_key_folder(table:str, replace_flag:bool):
if replace_flag:
return "{}/{}/{}".format(
'datamart-pod',
table,
"{{ ti.xcom_pull(task_ids=params.task, key=dag.dag_id).split('/')[-1] }}",
)
else:
return "{}/{}/{}/{}/{}/{}".format(
'datamart-pod',
table,
"{{ ds.split('-')[0] }}",
"{{ ds.split('-')[1] }}",
"{{ ds.split('-')[2] }}",
"{{ ti.xcom_pull(task_ids=params.task, key=dag.dag_id).split('/')[-1] }}",
)
- dest_bucket: The S3 bucket name.
- aws_conn_id: AWS connection ID configured in Airflow.
- replace: Indicates whether to replace the file in S3 if it already exists.
- params: Parameters passed to the task, including the task ID for the previous task.
Phase 3: transferring data from S3 to Redshift
In the final phase, we transfer the data stored in Amazon S3 to Amazon Redshift for further analysis and processing. This involves using Airflow’s
S3ToRedshiftOperator
to copy the data efficiently from S3 to Redshift.
inside the loop, we will put this operator
transfer_to_redshift_task = S3ToRedshiftOperator(
task_id=f"transfer_{table_name}_to_redshift",
redshift_conn_id="Redshift_dm_conn",
s3_bucket="datamart_pod" ,
s3_key=s3_key_template(profile["replace"]),
schema="eng" ,
aws_conn_id="AWS_CONN_S3_TO_REDSHIFT",
table=f"{table_name}",
copy_options=["FORMAT AS PARQUET", "FILLRECORD"],
method=rebuild_method(profile["replace"]),
upsert_keys=profile["upsert_keys"],
params={
"task": f"{table_name}_group.read_from_{table_name}_task",
"table": table_name,
},
on_failure_callback=slack_failed_callback,
dag=dag,
)
read_from_db_task >> upload_to_s3_operator_task >> transfer_to_redshift_task
so how does it work?
Parameters
redshift_conn_id
: The Airflow connection ID for Redshift.s3_bucket
: The name of the S3 bucket.s3_key
: The S3 key (path) to the data file. It is generated using thes3_key_template
function which uses thereplace
flag from the profile to determine the key structure.schema
: The Redshift schema where the table resides.aws_conn_id
: The Airflow connection ID for AWS credentials.table
: The name of the Redshift table where the data will be loaded. It uses the table name from the profile.copy_options
: A list of options for the Redshift COPY command. In this case, it includes options to format the data as Parquet and to fill in missing records (e.g.,["FORMAT AS PARQUET", "FILLRECORD"]
).method
: The method used for loading data, determined by therebuild_method
function using thereplace
flag from the profile. This could specify whether to append or replace data in the Redshift table.
def rebuild_method(replace_flag, **kwargs):
# Initialize Method with a default value
Method = "UPSERT" # Default method
if replace_flag == True:
Method = "REPLACE"
else:
dag_run = kwargs.get("dag_run")
dag_run_conf = dag_run.conf
if dag_run_conf and dag_run_conf.get("rebuild_mode") == "full":
Method = "REPLACE"
# Else, Method remains 'UPSERT'
logging.info("Method Rebuild----------->", Method)
return Method
upsert_keys
: The primary key(s) for the table, is used for upserting but in this operator follow this approach (delete and inserting) data.
Conclusion
In this article, we walked through the process of building a data pipeline using Apache Airflow to transfer data from a data source to Amazon S3 and then to Amazon Redshift. We divided the pipeline into three phases: loading data from the source, transferring data to S3, and transferring data from S3 to Redshift.
I hope you enjoyed it!