When implementing Change Data Capture (CDC) for data pipelines, it is common to rely on CDC-specific columns, such as last_updated
timestamps or database logs. However, in scenarios where such metadata or logs are unavailable, an alternative approach is to perform a full table comparison using hashing. This method provides a reliable and tool-agnostic way to identify changes efficiently without modifying the source schema or requiring third-party tools.
In this article, we explore how to achieve full table comparison with hashing to sync data from an SAP HANA source table to Snowflake, using Snowflake as both the staging and target database. This approach is scalable for large datasets, such as 10 million rows.
2. Generate Row-Level Hashes:
3. Compare Hashes with Previous Snapshot:
4. Identify Changes:
5. Load Changes into the Target Table:
MERGE
statements.Step 1: Extract Data from SAP HANA to Snowflake Staging Table
Use a data integration tool or script (e.g., Python or ETL tools) to extract data from SAP HANA and load it into a Snowflake staging table.
SQL to Create Staging Table in Snowflake:
CREATE OR REPLACE TABLE staging_table ( id INT, column1 STRING, column2 STRING, column3 STRING, load_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
Sample Data Load Workflow:
import pandas as pd from sqlalchemy import create_engine # SAP HANA Connection sap_connection = create_engine('hana+pyhdb://user:password@hostname:port') data = pd.read_sql('SELECT * FROM source_table', con=sap_connection) # Load to Snowflake from snowflake.connector.pandas_tools import write_pandas write_pandas(conn, data, 'STAGING_TABLE')
Step 2: Generate Row-Level Hashes in the Staging Table
Generate a hash for each row in the staging table to detect changes efficiently.
SQL to Add Row Hashes in Snowflake:
ALTER TABLE staging_table ADD COLUMN row_hash STRING; UPDATE staging_table SET row_hash = MD5(CONCAT(column1, '|', column2, '|', column3));
Step 3: Compare Hashes with Previous Snapshot in the Target Table
Use the row hashes to identify inserts, updates, and deletes by comparing the staging and target tables.
SQL to Create Target Table in Snowflake:
CREATE OR REPLACE TABLE target_table ( id INT, column1 STRING, column2 STRING, column3 STRING, row_hash STRING, load_date TIMESTAMP );
SQL to Identify Changes:
-- Identify Inserts SELECT * FROM staging_table s LEFT JOIN target_table t ON s.row_hash = t.row_hash WHERE t.row_hash IS NULL; -- Identify Updates SELECT s.* FROM staging_table s INNER JOIN target_table t ON s.id = t.id WHERE s.row_hash != t.row_hash; -- Identify Deletes SELECT t.* FROM target_table t LEFT JOIN staging_table s ON t.row_hash = s.row_hash WHERE s.row_hash IS NULL;
Step 4: Apply Changes Using Snowflake MERGE
Use Snowflake’s MERGE
statement to load the identified changes into the target table.
Snowflake MERGE Example:
MERGE INTO target_table AS target USING staging_table AS staging ON target.id = staging.id WHEN MATCHED AND target.row_hash != staging.row_hash THEN UPDATE SET target.column1 = staging.column1, target.column2 = staging.column2, target.column3 = staging.column3, target.row_hash = staging.row_hash, target.load_date = CURRENT_TIMESTAMP WHEN NOT MATCHED THEN INSERT (id, column1, column2, column3, row_hash, load_date) VALUES (staging.id, staging.column1, staging.column2, staging.column3, staging.row_hash, CURRENT_TIMESTAMP);
SELECT * FROM staging_table WHERE id BETWEEN 1 AND 1000000;
2. Parallel Processing:
3. Incremental Loads:
4. Efficient Hash Calculation:
MD5
or SHA
functions directly in SQL to minimize processing overhead.2. Flexibility:
3. Accuracy:
Full table comparison with hashing is a powerful method for CDC, especially for large datasets such as 10 million rows. By leveraging Snowflake’s SQL capabilities for staging, hashing, and merging, this approach provides an efficient, scalable, and accurate way to manage data synchronization. This solution is ideal for organizations prioritizing cost-effectiveness and high performance without relying on complex third-party tools or database logs.