Data Pipeline — Replicate data from MySQL to Snowflake with AWS Kinesis, Binlog Streamer Python Library & Snowpipe

Shreyas MS
18 min readJul 26, 2023

In this post we will see how to replicate the data automatically from MySQL (OLTP) database to Snowflake (OLAP) Cloud Data-Warehouse whenever there are some data changes happened in the source database (OLTP).

This article is little lengthy because I tried to explain each and every steps in detail. So please bear with me.

We will use below services to achieve the replication process.

  • MySQL Relational Database
  • Amazon EC2 Machine with Binlog Streamer Python Library & Tmux
  • Amazon Kinesis Data Streams & Firehouse for Streaming the data.
  • Amazon S3 to store the CDC log files.
  • Snowflake Snow-Pipe to load the data into Snowflake Tables
  • Snowflake Dynamic Tables to load the changed data.

Steps followed to achieve the replication:

  1. Create an Parameter Group, spin up the MySQL RDS Database & test the connection using IDE.
  2. Create an Amazon Kinesis Data Stream & Firehouse.
  3. Create an IAM Role & Amazon Ec2 machine.
  4. Install the required libraries on the Amazon Ec2 machine.
  5. Replicate CDC log files (JSON Format) from Data Lake (AWS S3) to Snowflake Tables.

Pre-requisites

This article assumes that the following objects are already configured:

— AWS & Snowflake account.

Basic Knowledge on SQL, Python & Shell Scripting

Step 1 → Create an Parameter Group, spin up the MySQL RDS Database & test the connection using IDE.

Lets create an new RDS Parameter Group & modify the “binlog_format” parameter.
The binary log is a set of log files which contain information about the data modifications made to MySQL Server.

— In the AWS console, navigate to Amazon RDS click on parameter groups & create the new parameter group as shown below.

— Open that Parameter Group, search for “bilog_format” parameter, edit it & change the values to ROW, to track all the row level changes and save it.

Lets create the Amazon RDS MySQL database. While creating it we should consider the new Parameter Group which we created & also we should enable the automated backup retention with out fail.

— In the AWS console, navigate to Amazon RDS & click on the Create Database & spin-up the MySQL database with default configurations and just modify the few configurations as show below. For more details about creating the RDS MySQL database click here.

  • Change the “Public access” to Yes.

Note: By enabling this option the Database is publicly accessible by anyone who have connection details. Since it is for learning we can enable but do follow the secure config methods as suggested by AWS.

  • In the DB parameter group, select the new group which we created earlier.
  • Make sure “Enable automated backups” is selected & modify the Backup retention period to 2 or 3 days of your choice.

It will take sometime for the RDS database to up & running. Once the DB is ready we will connect to database using any of the IDE of our choice and run the SQL scripts to create a table.

Now we will make some changes to security group attached to the RDS database instance, so that we can access the database from anywhere.

— In the AWS RDS console click on the MySQL DB which was created, in the Connectivity & security tab click on the VPC default security group in the new window , Edit the inbound rules , Add new rule as shown below & save it. This will allow us to connect to RDS database from Anywhere.

Now we will connect to the RDS Database through the IDE of our choice (in my case DBeaver IDE) and run some SQL queries.

— In the AWS console, navigate to Amazon RDS window and click on the MySQL database which we created earlier and copy the Endpoint & Port details from the Connectivity & security tab as show below.

— Open the IDE and create new connection by using the Endpoint and Port details. Here we need to provide the Username & Password which we gave while creating the RDS Database.

After connecting to the database through the IDE we can run the SQL code and create the database objects.

The binlog retention hours parameter is used to specify the number of hours to retain binary log files. Amazon RDS normally purges a binary log as soon as possible, but the binary log might still be required for replication with a MySQL database external to RDS.
The default value of binlog retention hours is NULL. For RDS for MySQL, NULL means binary logs aren't retained (0 hours).
To specify the number of hours to retain binary logs on a DB instance, use the mysql.rds_set_configuration stored procedure and specify a period with enough time for replication to occur. For more details click here

So in order to update this run the below code in the IDE & update the configuration to 12 hours.

call mysql.rds_set_configuration('binlog retention hours', 12);

Now lets create some tables on which we want to implement CDC.

— Click on this Github link where you can find the create table scripts do copy it and execute it in the IDE and create the tables.

We are done with Step 1 & will proceed to next step.

Step 2 → Create an Amazon Kinesis Data Stream & Firehouse.

In this step, we will create an Data Stream which will collect streaming data and Data Firehouse will deliver the streaming data to the destination (i.e Amazon S3).

Before creating the Data Stream we need the Amazon S3 bucket for storing the CDC logs, so lets create it.

— In the AWS console, navigate to Amazon S3 create the S3 bucket. For more details click here.
— Just provide the unique value for bucket name and keep other configurations as-is and create it.

Now we will create the Data Stream:

— In the AWS console, navigate to Amazon Kinesis & click on Create data stream
— Enter the Data stream name, select provisioned option. Since this task is for learning purpose, lets create the stream with just 1 shards. To know more click here.

Now we will create the Data Firehouse and use the above Data Stream as source and S3 bucket as destination. To know more click here .

— In the AWS console, navigate to Amazon Kinesis, select the Kinesis Data Firehouse and create it. Please follow the below configurations

  • Select the source as Kinesis Data Stream & Destination as S3 bucket.
  • Now select the Data Stream & S3 bucket which we created earlier.
  • Enable the Dynamic Partitioning, then enable New line delimiter & enable Inline parsing for JSON.

So here I will be dynamically partition the data based on the table names from the source logs.
Enter the Dynamic partitioning keys details as mentioned below

  • Key name → “ table
  • JQ expression → “ .table
  • S3 bucket prefix → “ mysql-cdc-data/TableName=!{partitionKeyFromQuery:table}/
  • S3 bucket error output prefix → “ mysql-cdc-error
  • Buffer size → “ 128 “ MiB
  • Buffer interval → “ 60 “ seconds

It will take some time to create the Kinesis Firehouse. So we are done with step 2 & will proceed to further steps.

Step 3 → Create an IAM Role & Amazon Ec2 machine.

In this step we will create the IAM Role which have Kinesis & S3 access.
Then will create a Amazon Ec2 machine and attach the IAM Role to it

— In AWS console, navigate to IAM → Roles & click Create Role.
— Select the option as show below & click next.

— Search for S3 & Kinesis individually and provide full access. Click next

Note: Since this is learning I am providing full access here, please read about best security practices and provide least privilege's.

— Enter the Role name & create it.

Now lets create the Amazon Ec2 machine:

— In the AWS console, navigate to Amazon EC2 & click on launch instance.
— Now modify few configuration which is required as part of this replication process and create the instance as shown below.

  • Name: Provide the Ec2 instance name
  • Select the Ubuntu as the server type and AMI image which is of Free tier eligible.
  • Select Key pair name as Proceed without a key pair option.

Rest of the configuration leave it default values.

Once the Ec2 machine is created we need to tag the IAM Role which we created earlier to Ec2 machine so that we can call the AWS services like Amazon Kinesis & S3 Bucket from it.

— In the AWS console, navigate to Amazon Ec2, select the instance which we created, click on Actions, navigate to Security tab and click on Modify IAM role.

— Select the IAM Role which we created earlier from the drop down and click on Update IAM role.

Now we are done with step 3 & will proceed to further steps.

Step 4→ Install the required libraries on the Amazon Ec2 machine.

In this step on the Ec2 machine we will install the Binlog Streamer Python Library which is the MySQL replication protocol & Tmux which is a terminal multiplexer, It lets you switch easily between several programs in one terminal, detach them (they keep running in the background).

— In the AWS console, navigate to Amazon Ec2 ,select the Ec2 instance click on Connect button.

— Then use the Ec2 Instance Connect option and click connect. This will open a new window with command line capabilities where we can run the shell commands to install the libraries.

Now we will install the required libraries on the Ec2 machine by running the below code one by one.

Note: while running the code, the cmd prompt will ask for your permission to proceed. So please select “Y” and click enter.

Code to install the libraries:

sudo apt-get update

sudo apt install python3-pip

pip install mysql-replication boto3 -t .

There are total 3 lines of code which needs to executed one after the other. Last line of code, please copy the code properly.

Here with these codes,

  • sudo apt-get update command downloads and installs the updates for each outdated package and dependency on your system.
  • Install or update the python PIP library.
  • At the end we will install the Binlog Streamer MySQL Replication library.

Below some of the screenshots of installation process.

Now we have every thing in place to track and load the CDC logs to S3, lets test it by calling the BinlogStreamerReader function in the python script and push the data to Amazon Data Kinesis stream.

Below is the python script.

import json
import boto3

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)

def main():
kinesis = boto3.client("kinesis",region_name=' ***AWS Region*** ')
stream = BinLogStreamReader(
connection_settings= {
"host": " ***RDS Hostname*** ",
"port": ***RDS Port Number*** ,
"user": " ***RDS Username*** ",
"passwd": " ***RDS Password*** "},
server_id=100,
blocking=True,
resume_stream=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema,
"table": binlogevent.table,
"type": type(binlogevent).__name__,
"row": row
}
kinesis.put_record(StreamName=" ***Kinesis Data Stream Name*** ", Data=str(event), PartitionKey="table")
print(json.dumps(event))

Copy it to notepad and update the RDS host name, credentials & kinesis stream name as show below. All these details we can get it from Amazon RDS and Amazon Kinesis Data Stream which we created in the earlier steps.

— In the AWS console, navigate to AWS RDS and click on the MySQL database which we created earlier and copy the host name from the Connectivity & security tab.

Now go back to command prompt and run the command “ python3 “ to enter into the “Python Command Prompt” where we can run the python code.

Now we are in the Python Command Prompt, lets run the python script which we have and see whether it is capturing the CDC logs.

— Copy the python code in the command prompt session & click enter.

— Later run the python script by calling the Main function in the same session.

So now in the session the Stream Reader is active and listening to the change events happening in the MySQL database. Lets do some changes like Insert records into the MySQL tables and see whether those CDC logs is capturing or not.

— Open the IDE, run the below SQL INSERT commands in the IDE to load the data into the tables.

Insert into REGIONS (REGION_ID,REGION_NAME) values (1,'Europe');
Insert into REGIONS (REGION_ID,REGION_NAME) values (2,'Americas');

Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('AR','Argentina',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('AU','Australia',3);

The moment we run the SQL commands the Stream Reader which is listening to the change events will show the CDC logs in the command prompt session & also at the same time it will load those CDC logs to the Amazon S3 bucket. (Note: Files to appear in the S3 bucket takes some time)

Note: If you want to review the data, then please download the file and open it in the Notepad++ or any other editor. Sample data shown below

— Click on the file and download.

If you are able to see the logs & files in S3 bucket then all the steps we created are in place and working fine. But the problem is suppose the command prompt session where we executed the python script timed out/closed, then new CDC logs will not be captured.

So to overcome this issue we will install the TMUX library and run the python Stream Reader script using that.

— In the command prompt, press CNTRL+Z to come out of python interpreter.
— Type the below code & enter it will open the editor in the command prompt.

vi mysql_cdc_logs_capture.py

— Press key “ I “ for INSERT mode, then paste the python code which we written earlier, just append the line “ main() “ at the end as show below and save it. For more detail on how to use VI editor click here.

Note: Just for reference I have uploaded this python file in the GitHub link. click here

— Now we will install TMUX, by entering the below code in the command prompt. If TMUX is already installed then it will just update to latest version. To know more about TMUX commands click here

sudo apt-get install tmux

— Type the below command and create the new session with the name “mysql-cdc-log-capture”.

tmux new -s mysql-cdc-log-capture

You can notice it’s an new session name at the end of the screen in green color

— Then in the new session just type the below command & run it.

python3 mysql_cdc_logs_capture.py

Now the python is scripting, we need to exit this new session and go back to old session without stopping the Python code in the new session. To do that first just press CNTRL+B and after that just press button D separately.

Hope the above step is working if not, please check the TMUX link again.

You can notice now command prompt is in old session name.

So now the python code is continuously running and listening to the changes happening to the MySQL database in the backend.

If required again you can run some SQL command in the IDE and check whether those logs are getting captured or not.
Note: Give some time for the Data pipeline to process those changes then it will reflect in S3 bucket.

Sample code below:

Insert into REGIONS (REGION_ID,REGION_NAME) values (3,'Asia');
Insert into REGIONS (REGION_ID,REGION_NAME) values (4,'Middle East and Africa');

Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('CA','Canada',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('CH','Switzerland',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('CN','China',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('DE','Germany',1);

Step 5 → Replicate CDC log files (JSON Format) from Data Lake (AWS S3) to Snowflake Tables.

In this step we will establish the secure connection between Snowflake & AWS, then will create the Tables, File Format, External Stages & Snowpipe. All these objects are required for replication purpose.

I have already published one more blog on how to create this secure framework & enable the replication into the snowflake. Please go through it and try to create it as per the instructions.

Link to that blog → Snowflake — Replicate CDC log files (JSON Format) from Data Lake (AWS S3) to Snowflake Tables.

Note: In the above block I explained about the LOG file format and followed 4 different steps to achieve the replication task. Please make sure the objects names we are using properly. There are only few changes which we need to do here, so I will be highlighting those details below w.r.t to the steps.

Step 1: No need to create the S3 bucket again here and also creating the folder structure in the S3 bucket is not required because we have enabled the Dynamic partition and folder prefix in the Amazon Kinesis Firehouse configuration earlier.

Step 2: No changes, follow the same instructions.
Step 3: No changes, follow the same instructions.
Step 4: No changes, follow the same instructions.

In the above blog I also did some testing as well by uploading the files manually to the S3 bucket folders. I you want you can perform those and test it or else you can directly run some SQL commands on MySQL database which will automatically create and upload the CDC log files to bucket. Later the snowpipe will load it into the Snowflake tables.

So now we have completed step 5 which is the final step of the Data Pipeline process. Now lets test the complete process.

Test the process flow:

Before we start our test cases lets clean up the tables in both MySQL & Snowflake and also delete all those CDC log files from the S3 bucket.

— Erase all the data by performing the truncate tables in both MySQL & Snowflake.

truncate table REGIONS;

truncate table COUNTRIES;

— In AWS console, navigate to S3 bucket, then into all the folders and delete all the files.

Now we will run some INSERT/DELETE/UPDATE SQL commands in the MySQL and see the same changes happening in the Snowflake tables as well. For reference use the below codes & run it in the MySQL database.

Insert into REGIONS (REGION_ID,REGION_NAME) values (1,'Europe');
Insert into REGIONS (REGION_ID,REGION_NAME) values (2,'Americas');
Insert into REGIONS (REGION_ID,REGION_NAME) values (3,'Asia');
Insert into REGIONS (REGION_ID,REGION_NAME) values (4,'Middle East and Africa');

Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('AR','Argentina',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('AU','Australia',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('BE','Belgium',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('BR','Brazil',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('CA','Canada',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('CH','Switzerland',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('CN','China',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('DE','Germany',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('DK','Denmark',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('EG','Egypt',4);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('FR','France',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('IL','Israel',4);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('IN','India',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('IT','Italy',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('JP','Japan',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('KW','Kuwait',4);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('ML','Malaysia',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('MX','Mexico',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('NG','Nigeria',4);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('NL','Netherlands',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('SG','Singapore',3);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('UK','United Kingdom',1);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('US','United States of America',2);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('ZM','Zambia',4);
Insert into COUNTRIES (COUNTRY_ID,COUNTRY_NAME,REGION_ID) values ('ZW','Zimbabwe',4);


update COUNTRIES
set COUNTRY_NAME = 'USA'
where COUNTRY_ID='US';

update REGIONS
set REGION_NAME = 'America'
where REGION_ID=1;

delete from REGIONS where region_id=4;
delete from COUNTRIES where COUNTRY_ID in ('ZW','ZM');

After inserting the data into the MySQL database, then the Binlog Streamer library will capture those row level changes & intimate the Amazon Kinesis Data Stream.

Then Kinesis Firehouse will push those changed log data in the JSON format into the Amazon S3.

Once the files are available in the S3 bucket the Snowpipe will load those data into the Snowflake tables.

The above process will take sometime so please wait for it and later view the data in the Snowflake tables.

As I explained in the other blog, you can perform different SCD types & load the above JSON data into the tabular format as per the project need.

This concludes the End to End process of Data Pipeline.

To Summarize:

  • I have used AWS & Snowflake cloud technologies here, you can use different cloud platforms to perform the same.
  • The replication process was configured for every minute (60sec configured in the DataFirehouse), you can modify this to other time intervals as per the need.
  • If the number of changes in the source system is huge then you can also increase the Kinesis DataStream shards count (1 shard configured in the DataStream), so that the stream will capture the logs in parallel.

Note: If you have done this project on free tier account (Snowflake & AWS) then don’t forget to delete the objects we created during the learning process or else you will end up in utilizing the complete free tier credits and after that paying the cost for the services which is up & running.

List of objects to delete/disable:

  • AWS RDS Instance.
  • AWS EC2 Machine
  • AWS S3 Bucket
  • AWS Kinesis Data Stream & Firehouse
  • Snowflake Table Data
  • Snowpipe

Thank You for taking time & reading this article, I hope this article will help you in understanding & configuring the Data Pipeline for CDC replication from OLTP to OLAP databases. I have tried to automate the process as much as possible. I’d love to hear your thoughts and insights, so don’t hesitate to leave a comment.

About Me:

I am working as a Cloud Data Engineer and love to write about AWS & Snowflake and other cloud offerings. I have been working on SQL, ETL/ELT, DataWarehouse/BI & Cloud technologies like AWS & Snowflake.

I am AWS Certified Solution Architect & working on earning the Snowflake’s Pro Core Certification as well.

Follow me on Medium to catch up on new articles on AWS & Snowflake cloud offerings. Feel free to connect with me on LinkedIn !!!

--

--

Shreyas MS

Data Engineer by Profession | Data & Cloud Enthusiast - Snowflake | AWS | Connect - linkedin.com/in/shreyas-ms-48661533