Unlocking Efficient Data Loading: PostgreSQL to Snowflake with Change Data Capture
3.04.2024PostgreSQL and Change Data Capture
The Change Data Capture method, also known as event sourcing, is a data loading method that extracts record-level change events (inserts, updates, deletes). In a PostgreSQL database, this method can be implemented using logical replication.
Logical replication was primarily implemented between two PostgreSQL databases. For this purpose, it uses a publish and subscribe model. The publisher is the node where a publication is defined. A publication is a set of changes generated from a table or a group of tables, also described as a replication set. The node where a subscription is defined is referred to as the subscriber. Each subscription receives changes via one replication slot (feature for tracking changes). The remote replication slot is created automatically when the subscription is created. Once a logical replication for a table is set up, the changes on the publisher are sent to the subscriber as they occur in real time. When using the publish and subscribe model, PostgreSQL handles the replication process automatically.
Configuration is slightly different when using logical replication to load data from PostgreSQL to target databases (e.g. Snowflake). We only use the Publisher side on PostgreSQL and the replication slot, which must be created manually. Logical Replication captures all database activities using the Write-Ahead Log (WAL) files. We can read the WAL files by using PostgreSQL system defined functions and load the change records to Snowflake.
Preparing the PostgreSQL database
To activate logical replication, define the following parameters within the Postgres configuration file: PostgreSQL.conf:
--By configuring the wal_level to “logical,” you enable the Write-Ahead Logging (WAL) system.
wal_level = logical
--The max_replication_slots setting should be either equal to or greater than the sum of connectors utilizing WAL and any additional replication slots employed by your database.
max_replication_slots = 5
--The max_wal_senders parameter defines the maximum permissible concurrent connections to the WAL. It should be set to a value that is at least double the number of logical replication slots.
max_wal_senders = 10
To implement the modifications, you have to restart your Postgres server.
CREATE TABLE table_1 (
id integer PRIMARY KEY,
description varchar(50)
);
CREATE TABLE table_2 (
id integer PRIMARY KEY,
description varchar(50)
);
INSERT INTO table_1 (id, description) VALUES (1, 'Orange'), (2, 'Apple');
INSERT INTO table_2 (id, description) VALUES (1, 'Fruit');
Create publication with following statement:
CREATE PUBLICATION pub_name FOR TABLE table_1;
or
CREATE PUBLICATION pub_name FOR ALL TABLES;
You can always add more tables to the existing publication with the ALTER command:
ALTER PUBLICATION pub_name ADD TABLE table_2;
Create a replication slot with the following statement:
SELECT * FROM pg_create_logical_replication_slot('rep_name', 'pgoutput');
The first parameter of the function represents the name of the replication slot. The second parameter represents the plugin. PostgreSQL has two plugins preinstalled:
- test_decoding: outputs textual representations of events
- pgoutput: outputs binary representations of events
Note that the use of ‘test_decoding’ plugin is for testing purposes only. You should use ‘pgoutput’ plugin in your actual data load job.
Since the ‘pgoutput’ plugin returns data in binary format, it is necessary to include decoding functions in the loading job. In our case we created python functions in Snowflake. Our functions were developed with the help of this code.
To see the content of the replication slot, you can execute:
SELECT pg_logical_slot_peek_binary_changes('rep_name', NULL, NULL, 'publication_names', 'pub_name', 'proto_version', '1');
Note that this function only returns the change data but does not consume the replication slot. To consume the replication slot, execute:
SELECT pg_logical_slot_get_binary_changes('rep_name', NULL, NULL, 'publication_names', 'pub_name', 'proto_version', '1');
Once the replication slot is consumed, you cannot access changes that happened before executing the pg_logical_slot_get_binary_changes function. To consume the replication slot only up to a certain log sequence number (lsn), you can set the second parameter of the function. The most often used value is the current_wal_lsn value that represents the current write-ahead log (WAL) location. You can retrieve the value with:
SELECT pg_current_wal_lsn();
In the ELT process you can save the current_wal_lsn value in a variable named current_wal_lsn to ensure consistency in the particular job run. With this variable you can execute the following system functions which return changes up to current_wal_lsn.
SELECT pg_logical_slot_peek_binary_changes('rep_name', '<current_wal_lsn>', NULL, 'publication_names', 'pub_name', 'proto_version', '1');
SELECT pg_logical_slot_get_binary_changes('rep_name', '<current_wal_lsn>', NULL, 'publication_names', 'pub_name', 'proto_version', '1');
Deciding on the number of publications
You can either create one publication for each table or add multiple tables to one publication. This depends on user-specific needs. It is important to be aware that a replication slot follows only publications that existed at the time of its creation.
Therefore, different publications for each table are not a good solution when you need to be able to add tables to a replication later on, as you will have to recreate the replication slot, and the change data is lost.
If you do not intend to add tables in the future, the benefit of having more publications is that you can retrieve change data for only specific publications. If you have only one publication for multiple tables, you can access the changes for each table after decoding the binary values.
ELT job architecture
We created one publication and one replication slot in the source PostgreSQL database to keep the adding tables option open. In the Snowflake target database, we created:
- the extract (EX) table,
- Python functions for decoding binary data,
- a view that uses Python functions to decode the binary data,
- stage (STG) tables with initial data.
We designed the ELT jobs with the following tasks:


In the main job (Figure 1), we load the data to Snowflake using the pg_logical_slot_peek_binary_changes function to ensure the replication slot is not consumed in case of task failure (task Append Extract). The data is loaded to Snowflake in binary format.

We want to keep the changes available for longer than one job run. Hence, we only append new data to the extract table and delete the records older than five days at the end of the job (Main job, task Delete Ex Last Data). This gives us enough buffer in case any issue arises.
In the next step, we materialise the decoding view for performance, as we repeatedly use it in the load job (Acar Load loop, main job). The data is returned in human-readable format (JSON). Also, column_names are parsed as strings containing column names and data types. This is used in the ELT process for select statement definition (as a variable in the load job).

When inserting data into the STG table, we use a condition that selects only data newer than the latest commit date in the stage (Load job, Insert INTO STG).
At the end of the job, we consume the replication slot with the pg_logical_slot_get_binary_changes function (Main job, task Consume replication slot). You must be careful to consistently consume the slot to prevent the disk from filling up.
Conclusion
With the above implementation, the client can specify the needed source table, and the whole process will be done automatically regardless of the number of source tables. In this example, we did not use the Matillion CDC option but have built a native one using PostgreSQL, Snowflake and Matillion capabilities. Let us know if you find the solution interesting, and we can share more details.

Consultant and Data Engineer

Consultant and Data Engineer