Leveraging Change Data Capture Features when Loading Data from Relational Databases


Change Data Capture in the Analytical Architecture

Change Data Capture (CDC) features are usually used for database replication purposes or real-time data loads. We leveraged this feature to reduce the load time from source databases to the central data warehouse on Snowflake and consequently lowered the cost. At the same time, we also avoided an additional overhead on the operational databases because we were not working with the actual source tables.

We used the MatillionETL platform which over the years become one of the leading ELT platforms for cloud data platforms on the market. One of the disadvantages of this tool is that the architecture is based on a virtual machine and the costs are proportional to the virtual machine run time on an hourly basis. Although the Matillion Productivity Cloud, a complete SaaS solution, was introduced by Matillion to solve these challenges, the advantages of our CDC-based solution – very efficient load – remain.

We verified our solution using the Microsoft SQL Server and PostgreSQL relational databases. They both provide the ability to track changes on source tables:

  • Microsoft SQL Server has the CDC functionality implemented as tables corresponding to original source tables where each row represents one change (insert, update or delete). The change tables are stored in a separate schema named “cdc” as documented here.
  • PostgreSQL has the Logical Replication feature. It includes publications and replication slots which track changes in write-ahead log files that contain data in binary format as documented here. Because the binary format is not human-readable, it is necessary to include decoding functions in the load job. In our case, these functions are implemented in Python and are stored in Snowflake.

With the CDC features activated on source databases we achieved not only loads as effective as possible but also as automated as possible. We developed a default load job structure which consists of two jobs: the “main” job and the “load” job. Every source database (connection) has its own set of these two jobs which are developed on the database level, not on a table level.

In the “main” job we define the tables included in the load by querying the information schema in the source database to retrieve the tables which have the CDC option activated (Microsoft SQL Server) or the tables added to the publication (PostgreSQL). Therefore, adding a table to the load job happens automatically when you activate the CDC functionality on the source database.

With tables included in the load defined, we proceed to the loop in the “load” job. This means that every table is passed to the “load” job as a variable and a loop can be defined with either parallel or sequential processing.

Figure 1 Main job
Figure 1 – Main Job

The “load” job consists of two branches: initial and incremental. First, the job determines whether the target table already exists in Snowflake. Based on this information the appropriate branch is chosen:

  • If the table doesn’t already exist, the initial load branch is chosen. A new table in Snowflake is created. This is the one time we load the full data from the actual source table to the Snowflake table.
  • If the table already exists, the incremental load branch is chosen. This branch loads the increment (changes) to a table in the EX (extract) schema, checks for schema change by comparing it to the STG schema (main schema) table, and then loads the data from EX to STG in append only mode. The append only mode, which means that all DML operations are inserts, is very helpful with understanding the source or debugging and historical data is preserved. After loading to the STG schema we also gather some statistics about the source and the target table to be able to compare basic data such as counts and dates.
Figure 2 – Load Job

On every crucial step in the load job we have implemented a notification by email in case of failure. We decided not to send emails on successful completion, except when creating new tables in the initial load.

Central Data Warehouse Architecture on Snowflake

On Snowflake we designed the following architecture for this approach:

  • Every source gets its own database named SRC_<actual source name>
  • The database contains schemas intended for load (MatillionETL)
  • There is one more schema, named as <actual source name> where the data is prepared for analytical usage. In this schema, the technical (CDC) columns are omitted, data is qualified by the most current date (if the history is not needed, otherwise VALID_FROM and VALID_TO information is added), dates are converted to the desired time zone, and so on. We left this step manual as is custom per source and user requirements.

We implemented the reporting structures with the Data Mesh approach. The reporting databases are created with respect to requirements from users where Snowflake role-based security also determines the access to the source products. We wanted to keep the the BI models or flat tables for final business users as flexible as possible. Most of them are implemented as views with automated materialization available in case of complex queries. For the BI models we selected the Kimball data modelling approach.

To automate the data quality checks, we developed data quality procedures on Snowflake which check data accuracy and that the data structure is consistent with the model, since the fact and dimension data structures are changed frequently according to business needs. For the users who need simple data checks but are not familiar with SQL, we developed Snowflake dashboards and roles with access only to these dashboards but not the actual databases.

Benefits of the solution

  • The main benefit of this solution is that incremental loads usually run for only a few minutes.
  • When a user wants to add new tables from the source system to the load job, they only need to configure the CDC in the source for the chosen table. The MatillionETL job or Snowflake does not have to be configured. Adding new columns is also not needed because new columns are added automatically.
  • Adding a new source system based on the Microsoft SQL Server or PostgreSQL database requires minimal work. An existing job in MatillionETL and a database in Snowflake can be copied and adjusted by changing the namings and the connections.

Mela Malej
Consultant and Data Engineer