Snowflake — Replicate CDC log files (JSON Format) from Data Lake (AWS S3) to Snowflake Tables.

Shreyas MS
12 min readJul 26, 2023

Now a days many companies are moving their Data Warehousing & Business Intelligence(BI) solution into the cloud computing-based technologies.

Snowflake Data Cloud is a go to option for this because of it’s features & advantages like:

  • Zero infrastructure & management overhead.
  • Consumption-based pricing.
  • Unlimited compute and storage scalability
  • Native SQL Scripting capability.

and many others advanced features.

As a result many questions arise such as:

  • How to perform incremental data load & ensure reliable Change Data Capture process from on-premises or other OLTP database into the Snowflake Data Cloud
  • How to perform bulk data load into Snowflake

In this blogs we will discuss about one of the approach on how to perform the automated replication of Change Data Capture (CDC) log files from the Data Lake (which are getting dumped from Streaming Applications) into the Snowflake Tables.

For this task lets consider we already have Streaming Application in place which is capturing data change events from MySQL database and dumping those CDC log files into the Data Lake (AWS S3) in JSON Format data. Below is the sample file data for the reference.

{
"schema": "mysqlDB",
"table": "COUNTRIES",
"type": "WriteRowsEvent",
"row": {
"values": {
"COUNTRY_ID": "IND",
"COUNTRY_NAME": "India",
"REGION_ID": 1
}
}
}

{
"schema": "mysqlDB",
"table": "COUNTRIES",
"type": "DeleteRowsEvent",
"row": {
"values": {
"COUNTRY_ID": "AR",
"COUNTRY_NAME": "Argentina",
"REGION_ID": 5
}
}
}

{
"schema": "mysqlDB",
"table": "COUNTRIES",
"type": "UpdateRowsEvent",
"row": {
"before_values": {
"COUNTRY_ID": "USA",
"COUNTRY_NAME": "America",
"REGION_ID": 4
},
"after_values": {
"COUNTRY_ID": "USA",
"COUNTRY_NAME": "United States of America",
"REGION_ID": 4
}
}
}

Now lets understand the JSON Data before moving on to building the replication process.

  • schema: Logical view name of the Database
  • table: Represents the table name on which data got changed
  • type:
    → “WriteRowsEvent”, this represent the new row got inserted in the source(MySQL) and we have to consider this as newly Inserted data into the target
    → “DeleteRowsEvent”, this represent the row which got deleted in the source(MySQL) and we have to consider this as Deleted data and same needs to be deleted or updated as old records (SCD Type 2) based on the business requirement. So here for our use case we considering delete option.
    UpdateRowsEvent”, this represent the existing row got updated with new value in the source(MySQL) and we have to consider this as Updated data and same row need to be update in the target as well.
  • row: values: Column names of the table
    → Note: For the type ‘UpdateRowsEvent” the log file is also capturing before & after values as well.

Hope the above explanation about the log file is clear and now we can proceed to building Replication process.

We will use below services to achieve the replication process.

  • AWS S3 for Storage Purpose and Event Notification
  • AWS SQS (Simple Queue Service) to store & send the event notification to Snowflake Data Loader
  • Snowflake Tables to load the data.
  • Snowflake Secured Integration Object to establish the connection between Snowflake & AWS.
  • Snowflake File Format & External Stage which specify the file format and path to that file (S3 bucket).
  • Snowpipe (Server-less Loader) to perform the automatic data load as soon as the file available in the External Stage(S3).

Steps followed to achieve the replication process.

  1. Create an AWS S3 bucket and folder structure.
  2. Create the Secured Integration Framework between Snowflake & AWS.
  3. Create Snowflake objects like Tables, File Formats & External Stage.
  4. Create Snowpipe and enable the S3 event notification to SQS.

Step 1 → Create an AWS S3 bucket and folder structure.

In this step we will create a AWS s3 bucket and different folders to upload the log files w.r.t table CDC log files.

— In the AWS console, navigate to Amazon S3 create the S3 bucket. For more details click here.
— Just provide the unique value for bucket name and keep other configurations as-is and create it.

— Click on the bucket name and create the folder “mysql-cdc-data” as shown below.

— Click on the folder “mysql-cdc-data” and inside that folder create 2 more new folders with respect to table name as shown below.
Note: The folder name I am creating based on the table names on which CDC logs are getting captured. As part of this learning I am using 2 different tables.

We are done with this step1 and will move on to next steps.

Step2 → Create the Secured Integration Framework between Snowflake & AWS using Integration objects

In this step we will create a secure integration framework between Snowflake and AWS and use that to connect and read the log files available in the AWS S3 bucket.

I have already published one more blog on how to create this secure framework, please go through it and try to create it as per the instructions.

Link to that blog → Snowflake & AWS Secure Integration Framework via Integration Objects

Note: In the above blog I have used different S3 bucket and created Snowflake objects like Tables , Stages etc. with different names & format. Those we need to do change & use the object names, tables structure according to MySQL CDC to Snowflake replication Data Pipeline. So I will be highlighting those details below.

There are 5 different steps I have explained in that blog:

Step 1: No changes, follow the same instructions.

Step 2: No changes, follow the same instructions.

Step 3: No need to create the new S3 bucket and upload the sample files, instead we can use the new S3 bucket (i.e. mysql-cdc-logs-bucket-8504 ) which we created in the Step1 of this blog. But update the AWS IAM Policy with this new bucket name.

Step 4: Follow the same instructions but just use the different naming convention for storage integration object. Code below.

CREATE STORAGE INTEGRATION mysql_cdc_logs
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = ' **** ARN Detail ****'
STORAGE_ALLOWED_LOCATIONS = ('s3://mysql-cdc-logs-bucket-8504/');

Step 5: No changes, follow the same instructions.

After performing all the steps, please do the testing also which I mentioned in that blog by creating those objects and uploading the data files into the S3 bucket. If you are able to read and load the data then secured connection in place. You can either clean-up those testing objects which you created or you can leave it as-is. In further steps we will create different objects with different formats and use those actually.

So now we have secure connections between Snowflake & AWS, lets move onto next steps.

Step 3 → Create Snowflake objects like Tables, File Formats & External Stage.

In this step, we need to create below objects in the Snowflake Database. All the code required to create those objects are provided in this blog & also available in the GitHub Link. Click here

  • Snowflake Tables — To load the CDC log data
  • File Format — which indicates the type of file we are reading from S3
  • External Stage — which indicates the CDC logs file path.

You can use either use the Snowflake Web Interface or any other IDE’s to do it. Click here

Snowflake Tables → Since we are working on the JSON data format, we need to create a table with column which can handle JSON format. So we have to create the column which is of VARIANT datatype.

So below is the table structure what I am creating. Here I am using 2 different tables on which I am going to replicate the data.

CREATE or replace TABLE STG_REGIONS
(
REGIONS_DATA variant
);

CREATE or replace TABLE STG_COUNTRIES
(
COUNTRIES_DATA variant
);

File Format → It is named database object that can be used to simplify the process of accessing the staged data and streamlines loading data into and unloading data out of database tables. A Snowflake File format encapsulates information of data files, such as file type (CSV, JSON, etc.) and formatting options specific to each type used for bulk loading/unloading. In our case we are dealing with JSON data format, so I have used JSON as a type and creating the File Format object.

CREATE OR REPLACE FILE FORMAT JSON_FORMAT
TYPE = 'json';

External Stage → This specifies the location where data files are stored. While creating this object we need to specify the STORAGE_INTEGRATION object name which we created during the step2, URL is the AWS S3 bucket name & FILE_FORMAT which we created earlier.

CREATE or replace STAGE mysql_cdc_logs_stage
STORAGE_INTEGRATION = mysql_cdc_logs
URL = 's3://mysql-cdc-logs-bucket-8504/' -- Change the bucket name if you have created with different name in step 1
FILE_FORMAT = json_format;

After creating the named External Stage we can run the below script to list out all the files available in the AWS — S3 bucket.

list @mysql_cdc_logs_stage;

So we are done with step3 lets move on to final step.

Step 4 → Create Snowpipe & enable the S3 event notification to SQS.

In this step we will create the Snowpipe object (Server-less loader) which enables loading data from files as soon as they’re available in a stage.

This means we can load data from files in micro-batches, making it available at the target within minutes, rather than manually executing COPY statements to load the data from the file into the Snowflake tables. To know more about Snowpipe click here.

create or replace pipe MYSQL_CDC_LOGS_TBL_REGIONS
auto_ingest=true
as
copy into STG_REGIONS from
@mysql_cdc_logs_stage/mysql-cdc-data/TableName=REGIONS/;

create or replace pipe MYSQL_CDC_LOGS_TBL_COUNTRIES
auto_ingest=true
as
copy into STG_COUNTRIES from
@mysql_cdc_logs_stage/mysql-cdc-data/TableName=COUNTRIES/;

So if you notice I have created 2 different pipes one for each table and I am directly getting the data from the External Stage(S3) and performing the SQL MERGE activity. To know more about the MERGE function click here.

After creating the snowpipe we need to take the notification ARN of it and use that to enable the S3 event notification to the SQS.

Run the below script in the snowflake and make a note of the ARN value from the “notifications_channel” column in the output.

show pipes;

Note: This code will display all the snowpipes available in the schema and if you notice both the snowpipes have same ARN details this is because Snowflake designates no more than one SQS queue per S3 bucket. Since we are pointing both the pipes to same S3 bucket the ARN for both the pipes will be same.

Now we will create the S3 event notifications:

— In the AWS console, navigate to the S3 and click on the bucket name which we created.
— Navigate to Properties Tab and scroll-down you will find a section called “Event notifications”.
— Click on Create Event notifications & use the configuration mentioned below

  • Event name — enter the event name
  • Event types — select only Put option, so that only when the files uploaded to bucket the event notification will be triggered.
  • Destination — select SQS queue
  • Specify SQS queue — select enter SQS queue ARN then enter the ARN value which you made a note earlier from the Snowflake output window.

Rest all configuration leave it to the default and click on Save changes.

So now we are done with all the steps, lets test the flow by uploading some JSON file. You can find the sample JSON file which I have used in this blog from the GitHub link click here.

Test the process flow:

To upload the files to S3:

— In the AWS console navigate to S3, click on the S3 bucket which we created, navigate thru the folders.
— Upload the REGION table related CDC log files into the folder with name “TableName=REGIONS” and upload the COUNTRIES table related CDC log files into the folder with name “TableName=COUNTRIES”.

Once we upload the files to the S3 bucket, the event notification informs Snowpipe to load them into the target table defined in the pipe.

Note: It will take sometime around(1 or 2 mins) for the data to get reflected in the table.

Lets go back to the Snowflake and run the below codes to view the data loaded into the table.

select * from stg_regions; -- to view regions related data


select * from stg_countries; -- to view countries related data

Lets upload the another JSON file into the S3 bucket with log data of Update & Delete type for both the tables.

After uploading the new files, once again run the same script in Snowflake. The Snowpipe has loaded new file data into the table.

select * from stg_regions; -- to view regions related data


select * from stg_countries; -- to view countries related data

So we are able to load the data from the S3 bucket to snowflake table automatically once the file is available in the bucket.

The data which we load is on JSON format, we can covert that data into tabular format and load it into the another table. This can be achieved by using several methods like INSERT / UPDATE / MERGE SQL commands.

So here I am using the MERGE SQL commands to load the data into the final table and perform UPSERT operation. Sample code is given below.


CREATE or replace TABLE REGIONS
(
REGION_ID integer PRIMARY KEY,
REGION_NAME VARCHAR(50),
created_date TIMESTAMP_LTZ,
updated_date TIMESTAMP_LTZ
);


CREATE or replace TABLE COUNTRIES
(
COUNTRY_ID VARCHAR(50) PRIMARY KEY,
COUNTRY_NAME VARCHAR(50),
REGION_ID INT,
created_date TIMESTAMP_LTZ,
updated_date TIMESTAMP_LTZ
);


merge into REGIONS tgt
using
(
select $1:row:values:REGION_ID::integer as REGION_ID,$1:row:values:REGION_NAME::varchar as REGION_NAME, $1:type::varchar as cdc_type
from @mysql_cdc_logs_stage/mysql-cdc-data/TableName=REGIONS/ where $1:type::varchar <> 'UpdateRowsEvent'
union
select $1:row:after_values:REGION_ID::integer as REGION_ID,$1:row:after_values:REGION_NAME::varchar as REGION_NAME, $1:type::varchar as cdc_type
from @mysql_cdc_logs_stage/mysql-cdc-data/TableName=REGIONS/ where $1:type::varchar = 'UpdateRowsEvent'
) src
on tgt.REGION_ID = src.REGION_ID
when not matched and cdc_type = 'WriteRowsEvent' then
insert values(src.REGION_ID,src.REGION_NAME,CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
when matched and cdc_type = 'UpdateRowsEvent' then
update SET tgt.REGION_NAME = src.REGION_NAME,
tgt.updated_date = CURRENT_TIMESTAMP
when matched and cdc_type = 'DeleteRowsEvent' then
Delete
;


merge into COUNTRIES tgt
using
(
select $1:row:values:COUNTRY_ID::varchar as COUNTRY_ID,$1:row:values:COUNTRY_NAME::varchar as COUNTRY_NAME, $1:row:values:REGION_ID::integer as REGION_ID, $1:type::varchar as cdc_type
from stg_countries where $1:type::varchar <> 'UpdateRowsEvent'
union
select $1:row:after_values:COUNTRY_ID::varchar as COUNTRY_ID,$1:row:after_values:COUNTRY_NAME::varchar as COUNTRY_NAME, $1:row:after_values:REGION_ID::integer as REGION_ID, $1:type::varchar as cdc_type
from stg_countries where $1:type::varchar = 'UpdateRowsEvent'
) src
on tgt.COUNTRY_ID = src.COUNTRY_ID
when not matched and cdc_type = 'WriteRowsEvent' then
insert values(src.COUNTRY_ID,src.COUNTRY_NAME,REGION_ID,CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
when matched and cdc_type = 'UpdateRowsEvent' then
update SET tgt.COUNTRY_NAME = src.COUNTRY_NAME,
tgt.REGION_ID = src.REGION_ID,
tgt.updated_date = CURRENT_TIMESTAMP
when matched and cdc_type = 'DeleteRowsEvent' then
Delete
;

This concludes the End to End process of replicating data from Data Lake to Data warehouse.

To Summarize:

In this article we have created Snowflake objects using SQL worksheet of the Snowflake. Then created the secured connection between Snowflake & AWS, later we have created the automated process to load the CDC logs data from the S3 bucket into the Snowflake tables using Snowpipe.

About Me:

I am working as a Cloud Data Engineer and love to write about AWS & Snowflake and other cloud offerings. I have been working on SQL, ETL/ELT, DataWarehouse/BI & Cloud technologies like AWS & Snowflake.

I am AWS Certified Solution Architect & working on earning the Snowflake’s SnowPro Core Certification as well.

Follow me on Medium to catch up on new articles on AWS & Snowflake cloud offerings. Feel free to connect with me on LinkedIn !!!

--

--

Shreyas MS

Data Engineer by Profession | Data & Cloud Enthusiast - Snowflake | AWS | Connect - linkedin.com/in/shreyas-ms-48661533