There's an easier way to do this. See below logic -
Input_DF
from pyspark.sql.functions import * from pyspark.sql.types import * schema = StructType([StructField("id", StringType(), True), StructField("type", StringType(), True), StructField("value", StringType(), True)]) df = spark.createDataFrame([('1','a','11'),('1','b','12'),('2','c','21')], schema) df.show(truncate=False) +---+----+-----+ | id|type|value| +---+----+-----+ | 1| a| 11| | 1| b| 12| | 2| c| 21| +---+----+-----+
First, pivot the type
column and aggregate it using its corresponding value
as below -
df1 = df.groupBy("id").pivot("type").agg(first("value")) df1.show() +---+----+----+----+ | id| a| b| c| +---+----+----+----+ | 1| 11| 12|null| | 2|null|null| 21| +---+----+----+----+
Once you have this, you have to replace null
values to its string
equivalent. Having said that, spark ignores null
values while creating json
column from struct
type. See below -
df1.select(*(df1.columns)).fillna("null").withColumn("json", regexp_replace(to_json(struct(col("a"), col("b"), col("c"))), "null", "")).drop("a", "b", "c").show(truncate=False) +---+--------------------------+ |id |json | +---+--------------------------+ |1 |{"a":"11","b":"12","c":""}| |2 |{"a":"","b":"","c":"21"} | +---+--------------------------+