Create json column from multiple rows using pyspark

120

Question: Create json column from multiple rows using pyspark

I would like to have a column in a pyspark df in json format.

exmaple df:

id type value
1 a 11
1 b 12
2 c 21

expected outcome:

id json
1 {"a":"11","b":"12","c":""}
2 {"a":"","b":"","c":"21"}

I tried to use

df.groupBy(df.id) \     .agg(collect_list(to_json(create_map(df.type, df.value))).alias('json')) \ 

but it returns a nested json like this {{"a":"11"},{"b":"12"}}

can someone help me with this, thank you!!

Total Answers: 3

15

Answers 1: of Create json column from multiple rows using pyspark

exemple df:

df  = spark.createDataFrame(       [         ('1','a','11'),         ('1','b','12'),         ('2','c','21')       ], ['id','type','value']     ) 
from pyspark.sql import functions as F  df.groupBy("id")\     .agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\     .withColumn('json', F.to_json('type_value'))\     .show(truncate=False)  +---+------------------+-------------------+ |id |type_value        |json               | +---+------------------+-------------------+ |1  |{a -> 11, b -> 12}|{"a":"11","b":"12"}| |2  |{c -> 21}         |{"c":"21"}         | +---+------------------+-------------------+ 
12

Answers 2: of Create json column from multiple rows using pyspark

There's an easier way to do this. See below logic -

Input_DF

from pyspark.sql.functions import * from pyspark.sql.types import *  schema = StructType([StructField("id", StringType(), True), StructField("type", StringType(), True), StructField("value", StringType(), True)])  df  = spark.createDataFrame([('1','a','11'),('1','b','12'),('2','c','21')], schema) df.show(truncate=False)  +---+----+-----+ | id|type|value| +---+----+-----+ |  1|   a|   11| |  1|   b|   12| |  2|   c|   21| +---+----+-----+ 

First, pivot the type column and aggregate it using its corresponding value as below -

df1 = df.groupBy("id").pivot("type").agg(first("value")) df1.show()  +---+----+----+----+ | id|   a|   b|   c| +---+----+----+----+ |  1|  11|  12|null| |  2|null|null|  21| +---+----+----+----+ 

Once you have this, you have to replace null values to its string equivalent. Having said that, spark ignores null values while creating json column from struct type. See below -

df1.select(*(df1.columns)).fillna("null").withColumn("json", regexp_replace(to_json(struct(col("a"), col("b"), col("c"))), "null", "")).drop("a", "b", "c").show(truncate=False)  +---+--------------------------+ |id |json                      | +---+--------------------------+ |1  |{"a":"11","b":"12","c":""}| |2  |{"a":"","b":"","c":"21"}  | +---+--------------------------+ 
8

Answers 3: of Create json column from multiple rows using pyspark

df  = spark.createDataFrame(       [         ('1','a','11'),         ('1','b','12'),         ('2','c','21')       ], ['id','type','value']     )  from pyspark.sql import functions as F from pyspark.sql.types import *  def input_type(json):     dic_json = eval(json)     for x in ['a', 'b', 'c']:        if x not in dic_json: dic_json[x] = ''     return dic_json  input_type_udf = F.udf(input_type, StringType())  df.groupBy("id")\     .agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\     .withColumn('json', F.to_json('type_value'))\     .withColumn('dic_json', input_type_udf(F.col('json')))\     .show(truncate=False) 
+---+------------------+-------------------+----------------+ |id |type_value        |json               |dic_json        | +---+------------------+-------------------+----------------+ |1  |{a -> 11, b -> 12}|{"a":"11","b":"12"}|{a=11, b=12, c=}| |2  |{c -> 21}         |{"c":"21"}         |{a=, b=, c=21}  | +---+------------------+-------------------+----------------+