# Databricks notebook source # Sample only: validation + quarantine + quality score from pyspark.sql import functions as F input_path = "/mnt/sample/landing/source.csv" curated_path = "/mnt/sample/curated/source_parquet" quarantine_path = "/mnt/sample/quarantine/source_bad_rows" raw_df = spark.read.option("header", True).csv(input_path) validated_df = ( raw_df .withColumn("id", F.col("id").cast("int")) .withColumn("amount", F.col("amount").cast("double")) .withColumn("is_valid", F.col("id").isNotNull() & F.col("amount").isNotNull()) ) good_df = validated_df.filter(F.col("is_valid") == True).drop("is_valid") bad_df = validated_df.filter(F.col("is_valid") == False).drop("is_valid") good_df.write.mode("overwrite").parquet(curated_path) bad_df.write.mode("overwrite").parquet(quarantine_path) total_cnt = validated_df.count() good_cnt = good_df.count() quality_score = (good_cnt / total_cnt) if total_cnt > 0 else 0 print(f"Total rows: {total_cnt}") print(f"Valid rows: {good_cnt}") print(f"Quality score: {quality_score:.2%}")