1. Databricks Migration

Overview

This case study describes the migration of a large-scale, globally managed SQL/Databricks data platform to a modern, team-specific architecture using Airflow and dbt. The project aimed to improve data pipeline structure, ownership, and maintainability by moving away from manual SQL scheduling and central management.

The Challenge

The legacy platform relied on hundreds of SQL scripts scheduled and managed manually in Databricks, with little documentation or clear ownership. The system had grown organically over years, creating operational challenges:

  • Manual Scheduling: SQL jobs were triggered and maintained by hand, leading to operational risk and lack of transparency

  • Tangled Dependencies: Pipelines grew without clear structure, resulting in unclear data lineage and brittle interdependencies

  • Unclear Ownership: Teams struggled to identify responsibility for specific datasets and workflows

  • Limited Visibility: No centralized monitoring or alerting for pipeline failures

  • Difficult Maintenance: Changes required deep system knowledge, creating bottlenecks and slowing development

Migration Strategy

The migration followed a phased approach, prioritizing business-critical datasets and establishing patterns for subsequent migrations.

Phase 1: Core Dataset Migration

The first phase focused on moving foundational datasets that other pipelines depended on:

  • Migrated Google Analytics and Salesforce data ingestion to Airflow DAGs

  • Rebuilt transformation logic as dbt incremental models

  • Implemented data quality tests to validate consistency with legacy outputs

  • Ran parallel pipelines during transition period to verify results

  • Established monitoring and alerting for new workflows

Phase 2: Dependent Pipeline Migration

With core datasets stable, the team migrated downstream analytics pipelines:

  • Mapped pipeline dependencies to determine migration sequence

  • Converted SQL transformations to dbt models with proper lineage documentation

  • Implemented dbt tests for data quality and referential integrity

  • Updated downstream consumers to point to new tables

  • Decommissioned legacy pipelines only after validation period

Phase 3: Team Enablement

The final phase focused on organizational change and knowledge transfer:

  • Restructured repositories by team ownership and domain

  • Delivered training sessions on Airflow and dbt best practices

  • Established CI/CD pipelines for automated testing and deployment

  • Created documentation and runbooks for common operations

  • Assigned team ownership for ongoing maintenance and enhancements

Salesforce History Migration Challenge

During the Salesforce migration, a critical challenge emerged: the legacy Salesforce instance was migrated to a new server, regenerating all entity IDs. History tables containing audit trails remained in the old data warehouse with references to obsolete IDs, breaking the connection between current records and their change history.

Problem Analysis

The Salesforce instance migration created a data continuity problem:

  • All main entity tables (accounts, opportunities, cases, invoices, products) received new IDs

  • Salesforce history tables tracked field changes over time using old entity IDs

  • History tables were not migrated by Salesforce, leaving them orphaned in the data warehouse

  • No native mapping existed between old and new IDs

  • Historical audit data could not be linked to current records, breaking compliance requirements

Solution Design

To preserve history while accommodating new IDs, the team implemented a multi-stage ID translation process. A custom field was added to Salesforce entity tables to store the mapping between old and new IDs. The migration process then systematically translated all ID references in history tables.

Implementation Approach

The migration was executed in five stages, each with validation checkpoints:

  1. ID Mapping Extraction: Created lookup tables linking old and new IDs by extracting mappings from Salesforce tables. For entities without mappings in the database, fetched historical records via Salesforce API.

  2. Primary ID Translation: Cloned original history tables for safety, then joined with mapping tables to translate primary entity IDs (account_id, opportunity_id, etc.) and user IDs (created_by_id).

  3. Field Value ID Translation: Identified history records where old_value and new_value fields contained entity IDs (data_type='EntityId'), then translated IDs within those JSON fields. Preserved text values without translation where fields had mixed data types.

  4. History Table Replacement: Created backups of current history tables, removed pre-migration records with outdated IDs, and inserted translated history records with verified data integrity.

  5. Reconciliation: Identified records created in the new Salesforce instance before the migration date (which had no mapping) and restored their history from backup copies to ensure no audit gaps.

Validation Strategy

At each stage, automated validation scripts verified:

  • Record count comparisons between input and output tables

  • Duplicate detection using history record IDs

  • Mapping completeness (flagging old IDs without corresponding new IDs)

  • Per-entity history count validation before and after translation

  • Sample record reviews to verify field value translation accuracy

For implementation examples, see the code snippets below:

ID mapping extraction from Salesforce tables
from pyspark.sql.functions import col

tables = [
    {'table_name': "accounts"},
    {'table_name': "opportunities"},
    {'table_name': "services"},
    {'table_name': "cases"},
]

database = 'stage'

for table in tables:
    prefix = f"pro_monetization.{database}.salesforce_subscriptions_{table['table_name']}"
    mapping_activity_suffix = "salesforce_history_mapping_id_fix"

    mapping_df = spark.sql(f"""
        SELECT 
            masc_id__c as old_id, 
            id as new_id 
        FROM pro_monetization.{database}.salesforce_subscriptions_{table['table_name']} 
        WHERE masc_id__c is not NULL
    """).dropDuplicates(['old_id'])
    
    print(f"{table['table_name']}: {mapping_df.count()} mappings extracted")
    
    mapping_df.write.mode('overwrite').format('delta').saveAsTable(
        f"{prefix}_mapping_step_0_{mapping_activity_suffix}"
    )

Validation checks for record counts and integrity
tables = [
    {'table_name': "accounts", 'id_field_name': "account_id"},
    {'table_name': "opportunities", 'id_field_name': "opportunity_id"},
    {'table_name': "services", 'id_field_name': "parent_id"},
    {'table_name': "cases", 'id_field_name': "case_id"},
]

database = 'stage'

for table in tables:
    prefix = f"pro_monetization.{database}.salesforce_subscriptions_{table['table_name']}"
    activity_suffix = "salesforce_history_id_fix"
    mapping_activity_suffix = "salesforce_history_mapping_id_fix"

    count_deep_clone = spark.sql(f"""
        SELECT count(*) 
        FROM pro_monetization.{database}.deep_clone_salesforce_subscriptions_{table['table_name']}_history
    """).collect()[0][0]

    count_output = spark.sql(f"""
        SELECT count(*) 
        FROM {prefix}_history_output_step_1_{activity_suffix}
    """).collect()[0][0]

    count_duplicates = spark.sql(f"""
        SELECT 
            (SELECT COUNT(*) FROM {prefix}_history_output_step_1_{activity_suffix})
          - (SELECT COUNT(DISTINCT id) FROM {prefix}_history_output_step_1_{activity_suffix})
        AS total_duplicate_rows;
    """).collect()[0][0]

    mappings_count = spark.sql(f"""
        SELECT count(*) 
        FROM {prefix}_mapping_step_0_{mapping_activity_suffix}
    """).collect()[0][0]

    count_old_records_without_mapping = spark.sql(f"""
        SELECT count(DISTINCT({table['id_field_name']}))
        FROM pro_monetization.{database}.deep_clone_salesforce_subscriptions_{table['table_name']}_history
        WHERE {table['id_field_name']} not in (
            SELECT distinct(masc_id__c) 
            FROM {prefix}
        )
    """).collect()[0][0]

    diff_percentage = (count_deep_clone - count_output) * 100 / count_deep_clone if count_deep_clone > 0 else 0

    print(f"""
salesforce_subscriptions_{table['table_name']}
    - Original history count: {count_deep_clone}
    - Output history count: {count_output} (diff: {diff_percentage:.2f}%)
    - Duplicates in output: {count_duplicates}
    - Mappings count: {mappings_count}
    - Old records without mapping: {count_old_records_without_mapping}
    """)

Migration Outcomes

The Salesforce history migration successfully preserved audit trails across the system migration:

  • Migrated 7 major entity types with complete history preservation

  • Maintained audit trail integrity for compliance requirements

  • Zero data loss for entities with ID mappings

  • Documented rollback procedures tested in staging environment

  • Established reusable patterns for future Salesforce migrations

Key Lessons Learned

The Databricks platform migration provided valuable insights for similar projects:

  • Dependency Mapping is Critical: Understanding upstream and downstream dependencies prevents cascading failures and enables proper sequencing

  • Parallel Running Reduces Risk: Running legacy and new pipelines in parallel during transition provides validation and enables quick rollback

  • Incremental Migration Enables Progress: Starting with core datasets provides a stable foundation and builds team confidence

  • Ownership Drives Accountability: Clear team assignment improves code quality, documentation, and responsiveness to issues

  • Comprehensive Validation Prevents Data Loss: Automated validation at each stage catches issues early and enables confident decommissioning of legacy systems

Further Reading

For detailed migration best practices, validation approaches, and common pitfalls, see the 7.2. Data Migration Guide.