Apache Airflow is a leading workflow management platform designed for orchestrating complex data engineering pipelines.
In this article, we demonstrate a practical implementation:
Migrating Data from MySQL to Hive with Apache Airflow.
Follow our step-by-step guide to seamlessly transfer data by leveraging Airflow’s MySqlToHiveOperator within a
Directed Acyclic Graph (DAG), ensuring an automated, scalable, and reliable migration process.
Scenario Overview: Migrating Data from MySQL to Hive
In this use case, we will migrate data from the employees table in the sampleDB database of MySQL to the companydb database in Hive.
A: Verify Source Data in MySQL
To confirm the data in the source table, execute the following query in the MySQL CLI:
mysql> select * from sampleDB.employees limit 2;
Output:
+------------+--------+------------+-----------------+----------+---------------+-------------------+-----------+
| first_name | gender | start_date | last_login_time | salary | bonus_percent | senior_management | team |
+------------+--------+------------+-----------------+----------+---------------+-------------------+-----------+
| Douglas | Male | 1993-08-06 | 12:42:00 | 97308.00 | 6.945 | 1 | Marketing |
| Thomas | Male | 1996-03-31 | 06:53:00 | 61933.00 | 4.170 | 1 | |
+------------+--------+------------+-----------------+----------+---------------+-------------------+-----------+
2 rows in set (0.03 sec)
B: Prepare the Target Database in Hive
0: jdbc:hive2://localhost:10000/> create database companydb;
Output:
0: jdbc:hive2://localhost:10000/> SHOW TABLES;
Output:
+-----------+
| tab_name |
+-----------+
+-----------+
(No tables exist in companydb at this stage.)
Create a new Python file named export_mysql_to_hive.py in your Airflow DAGs folder (/airflow/dags/). You can use any text editor or IDE (e.g., nano, vim, PyCharm).
Include the necessary Python dependencies for the workflow:
import pendulum
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
from datetime import timedelta
Set default configurations for the DAG, such as retries and ownership:
default_args = {
'owner': 'data_engineering',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
Note: Additional arguments like email, email_on_failure, and depends_on_past are commented out but can be enabled as needed.
Configure the DAG with a unique ID, schedule, and description:
dag_export_mysql_to_hive = DAG(
dag_id='Export_mysql_to_hive',
default_args=default_args,
schedule='@once', # Run manually or trigger once
start_date=pendulum.yesterday('Asia/Dhaka'), # Set your timezone.
dagrun_timeout=timedelta(minutes=60),
description='Transfer data from MySQL to Hive for employees table',
)
Use a SQL query to extract and format the data from MySQL, delimiting fields with a tab (\t):
mysqlquery = """
SELECT CONCAT_WS('\t', first_name, gender, start_date, last_login_time, salary, bonus_percent, senior_management, team)
FROM sampleDB.employees;
"""
Task 1: Create the Hive Table
Define a HiveOperator task to create an external Hive table with the correct schema and delimiter:
create_hive_table = HiveOperator(
task_id="create_hive_table",
hql="""
CREATE EXTERNAL TABLE IF NOT EXISTS companydb.employees (
first_name STRING,
gender STRING,
start_date STRING,
last_login_time STRING,
salary DOUBLE,
bonus_percent DOUBLE,
senior_management INT,
team STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\\t'
STORED AS TEXTFILE;
""",
hive_cli_conn_id="hive_default",
dag=dag_export_mysql_to_hive,
)
Task 2: Transfer Data from MySQL to Hive
Use the MySqlToHiveOperator to export data from MySQL to the Hive table:
export_mysql_to_hive = MySqlToHiveOperator(
task_id='export_mysql_to_hive',
sql=mysqlquery,
hive_table='companydb.employees',
create=False, # Table is already created by the previous task
recreate=False, # Avoid dropping and recreating the table
mysql_conn_id='mysql_default', # Connection ID for MySQL
hive_cli_conn_id='hive_default', # Connection ID for Hive
dag=dag_export_mysql_to_hive,
)
Set the execution order of tasks using the bitshift operator (>>):
create_hive_table >> export_mysql_to_hive
This ensures the Hive table is created before data is exported from MySQL.
Combine all sections into the final export_mysql_to_hive.py file:
import pendulum
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator
from datetime import timedelta
default_args = {
'owner': 'data_engineering',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag_export_mysql_to_hive = DAG(
dag_id='Export_mysql_to_hive',
default_args=default_args,
schedule='@once',
start_date=pendulum.yesterday('Asia/Dhaka'),
dagrun_timeout=timedelta(minutes=60),
description='Transfer data from MySQL to Hive for employees table',
)
mysqlquery = """
SELECT
CONCAT_WS('\t', first_name, gender, start_date, last_login_time, salary, bonus_percent, senior_management, team)
FROM
sampleDB.employees;
"""
create_hive_table = HiveOperator(
task_id="create_hive_table",
hql="""
CREATE EXTERNAL TABLE IF NOT EXISTS companydb.employees (
first_name STRING,
gender STRING,
start_date STRING,
last_login_time STRING,
salary DOUBLE,
bonus_percent DOUBLE,
senior_management INT,
team STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\\t'
STORED AS TEXTFILE;
""",
hive_cli_conn_id="hive_default",
dag=dag_export_mysql_to_hive,
)
export_mysql_to_hive = MySqlToHiveOperator(
task_id='export_mysql_to_hive',
sql=mysqlquery,
hive_table='companydb.employees',
create=False,
recreate=False,
mysql_conn_id='mysql_default',
hive_cli_conn_id='hive_default',
dag=dag_export_mysql_to_hive,
)
create_hive_table >> export_mysql_to_hive
Configure Airflow Connections for MySQl and Hive.
To populate the Airflow MySQL connection form correctly, follow these steps based on Figure: 2. Add Connection:
Additional Fields (if available)
If there are additional fields like Extra, you can leave them empty unless you have specific
connection parameters to add (e.g., SSL settings, connection timeouts).
Save the Connection
After filling in the fields, click Save to store the connection in Airflow.
Connection 2: for Hive:
To fill in the fields for your Hive connection in Airflow, use the following details based on the image (Figure: 3. Add Connection) and typical Hive configurations:
Standard Fields
Additional Configuration (if needed)
Now we can see connections as following:
Note: For this project Connection Id for MySQl: mysql_default and Connection Id for Hive: hive_default are shown in the Figure: 4.
On the screen, you can observe the Task ID "Export_mysql_to_hive" within the Airflow Directed Acyclic Graph (DAG) in its idle state, indicating that the task has not yet been executed.
Note: Top most of Figure: 5.
To start the execution of the DAG, follow these steps:
By following these steps, you ensure that the DAG is executed correctly and monitor its successful completion.
To confirm that the employees table data has been successfully transferred from MySQL to the Hive database companydb, follow these steps:
0: jdbc:hive2://localhost:10000/>USE companydb;
0: jdbc:hive2://localhost:10000/> SHOW TABLES;
Output:
+-----------+
| tab_name |
+-----------+
| employees |
+-----------+
1 row selected (0.059 seconds)
Ensure that the employees table is listed.
0: jdbc:hive2://localhost:10000/>SELECT * FROM employees LIMIT 5;
Output:
+-----------------------+-------------------+-----------------------+----------------------------+-------------------+--------------------------+------------------------------+------------------+
| employees.first_name | employees.gender | employees.start_date | employees.last_login_time | employees.salary | employees.bonus_percent | employees.senior_management | employees.team |
+-----------------------+-------------------+-----------------------+----------------------------+-------------------+--------------------------+------------------------------+------------------+
| Douglas | Male | 1993-08-06 | 12:42:00 | 97308.0 | 6.945 | 1 | Marketing |
| Thomas | Male | 1996-03-31 | 06:53:00 | 61933.0 | 4.17 | 1 | |
| Maria | Female | 1993-04-23 | 11:17:00 | 130590.0 | 11.858 | 0 | Finance |
| Jerry | Male | 2005-03-04 | 13:00:00 | 138705.0 | 9.34 | 1 | Finance |
| Larry | Male | 1998-01-24 | 16:47:00 | 101004.0 | 1.389 | 1 | Client Services |
+-----------------------+-------------------+-----------------------+----------------------------+-------------------+--------------------------+------------------------------+------------------+
5 rows selected (2.195 seconds)
Verify that the data matches the records from your MySQL employees table.
By performing these checks, you can confirm that the data has been correctly dumped from MySQL to Hive.
In this guide, we explored the process of migrating data from MySQL to Hive using Apache Airflow. By leveraging Airflow's powerful workflow automation capabilities, we successfully set up a pipeline to seamlessly transfer and transform data between these two systems. This approach ensures data consistency, reliability, and efficiency in handling large datasets.

Image:freepik