How to write data in Avro Format from Iceberg format using DataOS PyFlare?¶
# Import necessary librariesfrompyspark.sqlimportRowimportrandomfromdatetimeimportdatetime,timedeltafrompyspark.sql.functionsimportcolfrompyflare.sdkimportload,save,session_builder# Generate a random date of birthdefgenerate_random_date_of_birth():start_date=datetime(1998,1,1)end_date=datetime(1998,12,31)random_days=random.randint(0,(end_date-start_date).days)return(start_date+timedelta(days=random_days)).strftime("%Y-%m-%d")# Generate random data recordsdefGenerateData():num_rows=random.randint(1000,10000)first_names=["Alice","Bob","Charlie","David","Emily","Frank","Grace","Henry","Ivy","Jack","Karen","Leo","Mia","Noah","Olivia","Peter","Quinn","Rachel","Sam","Tom","Uma","Victor","Wendy","Xander","Yvonne","Zane"]data=[]for_inrange(num_rows):name=random.choice(first_names)date_of_birth_str=generate_random_date_of_birth()date_of_birth_ts=datetime.strptime(date_of_birth_str,"%Y-%m-%d")age=random.randint(20,99)data.append(Row(name=name,date_of_birth=date_of_birth_ts,age=age))returndata# Define Spark configurationsparkConf=[("spark.app.name","Dataos Sdk Spark App"),# Set the Spark application name("spark.master","local[*]"),# Use local mode for Spark execution]# DataOS configurationDATAOS_FQDN="{{dataos fqdn}}"token="{{dataos apikey token}}"# Create a Spark session with DataOS settingsspark=session_builder.SparkSessionBuilder(log_level="INFO") \
.with_spark_conf(sparkConf) \
.with_user_apikey(token) \
.with_dataos_fqdn(DATAOS_FQDN) \
.with_depot("dataos://icebase:pyflaresdk/test_write_04","rw") \
.build_session()# Define options for saving data in Iceberg format with Avro write format and partitioningopts={"fanout-enabled":"true","write-format":"avro","iceberg":{"partition":[{"type":"identity","column":"name"}]}}# Generate random data and create a DataFrameavro_iceberg=spark.createDataFrame(GenerateData()).repartition(1)# Save DataFrame to DataOS in Iceberg format with Avro write format and overwrite modesave(name="dataos://icebase:pyflaresdk/test_write_04",dataframe=avro_iceberg,format="iceberg",mode="overwrite",options=opts)# Load and display data from DataOS in Iceberg formatload(name="dataos://icebase:pyflaresdk/test_write_04",format="iceberg").show(10)# Uncomment the following line if you wish to stop the Spark session# spark.stop()