Audit Guide

Here’s a practical guide to auditing and analyzing datasets, workloads, and costs within Databricks environment. This checklist helps understand data usage, performance, and cost management.

Inventory: What Data Assets Exist?

Show the top N largest tables in Databricks by size

 1from pyspark.sql import SparkSession
 2from typing import Optional
 3
 4spark = SparkSession.builder.getOrCreate()
 5
 6# === PARAMETERS ===
 7MAX_TABLES = 100  # Number of top tables to return
 8SKIP_SCHEMAS = {"information_schema"}
 9
10# === FUNCTIONS ===
11
12
13def format_size(size_in_bytes: Optional[int]) -> str:
14    if size_in_bytes is None:
15        return "N/A"
16    for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
17        if size_in_bytes < 1024:
18            return f"{size_in_bytes:.2f} {unit}"
19        size_in_bytes /= 1024
20    return f"{size_in_bytes:.2f} PB"
21
22
23# === MAIN ===
24results = []
25
26catalogs = [row.catalog for row in spark.sql("SHOW CATALOGS").collect()]
27
28for catalog in catalogs:
29    try:
30        spark.sql(f"USE CATALOG {catalog}")
31        schemas_raw = spark.sql("SHOW SCHEMAS").collect()
32        schema_names = [getattr(row, 'namespace', getattr(
33            row, 'databaseName', None)) for row in schemas_raw]
34    except Exception as e:
35        print(f"⚠️ Skipping catalog {catalog}")
36        continue
37
38    for schema in schema_names:
39        if not schema or schema.lower() in SKIP_SCHEMAS:
40            continue
41
42        try:
43            tables = spark.sql(f"SHOW TABLES IN {catalog}.{schema}").collect()
44        except Exception as e:
45            print(f"⚠️ Skipping schema {catalog}.{schema}")
46            continue
47
48        for table in tables:
49            if hasattr(table, 'entityType') and table.entityType.lower() == 'view':
50                continue  # Skip views
51
52            full_name = f"{catalog}.{schema}.{table.tableName}"
53            print(f"🔍 Checking table: {full_name}")
54
55            try:
56                detail = spark.sql(f"DESCRIBE DETAIL {full_name}").collect()[
57                    0].asDict()
58                size = detail.get("sizeInBytes", 0)
59                results.append({
60                    "catalog": catalog,
61                    "schema": schema,
62                    "table": table.tableName,
63                    "format": detail.get("format", "unknown"),
64                    "lastModified": detail.get("lastModified"),
65                    "sizeInBytes": size,
66                    "sizeReadable": format_size(size),
67                    "numFiles": detail.get("numFiles", 0),
68                })
69            except Exception as e:
70                print(f"❌ Skipped {full_name}")
71
72# === DISPLAY RESULT ===
73if results:
74    df = spark.createDataFrame(results)
75    top_n_df = df.orderBy("sizeInBytes", ascending=False).limit(MAX_TABLES)
76    display(top_n_df)
77else:
78    print("❌ No table metadata collected.")

Usage: Which datasets are regularly queried?