How to Overwrite Dynamic Iceberg Partitions Using DataOS PyFlare?¶
# Import necessary librariesfrompyspark.sqlimportRowimportrandomfromdatetimeimportdatetime,timedeltafrompyspark.sql.functionsimportcolfrompyflare.sdkimportload,save,session_builder# Generate a random date of birthdefgenerate_random_date_of_birth():start_date=datetime(1998,1,1)end_date=datetime(1998,12,31)random_days=random.randint(0,(end_date-start_date).days)return(start_date+timedelta(days=random_days)).strftime("%Y-%m-%d")# Generate random data recordsdefGenerateData():num_rows=random.randint(1000,10000)first_names=["Alice","Bob","Charlie","David","Emily","Frank","Grace","Henry","Ivy","Jack","Karen","Leo","Mia","Noah","Olivia","Peter","Quinn","Rachel","Sam","Tom","Uma","Victor","Wendy","Xander","Yvonne","Zane"]data=[]for_inrange(num_rows):name=random.choice(first_names)date_of_birth_str=generate_random_date_of_birth()date_of_birth_ts=datetime.strptime(date_of_birth_str,"%Y-%m-%d")age=random.randint(20,99)data.append(Row(name=name,date_of_birth=date_of_birth_ts,age=age))returndata# Define Spark configurationsparkConf=[("spark.app.name","Dataos Sdk Spark App"),# Set the Spark application name("spark.master","local[*]"),# Use local mode for Spark execution]# DataOS configurationDATAOS_FQDN="{{dataos fqdn}}"token="{{dataos apikey token}}"# Create a Spark session with DataOS settingsspark=session_builder.SparkSessionBuilder(log_level="INFO") \
.with_spark_conf(sparkConf) \
.with_user_apikey(token) \
.with_dataos_fqdn(DATAOS_FQDN) \
.with_depot("dataos://icebase:pyflaresdk/test_write_03","rw") \
.build_session()# Ignore warning for "The specified path does not exist."# Generate random data and create a DataFrame for overwritingoverwriteDf=spark.createDataFrame(GenerateData()).repartition(1)# Define options for saving data in Iceberg format with partitioningopts={"fanout-enabled":"true","iceberg":{"partition":[{"type":"identity","column":"name"}]}}# Save DataFrame to DataOS in Iceberg format with overwrite modesave(name="dataos://icebase:pyflaresdk/test_write_03",dataframe=overwriteDf,format="iceberg",mode="overwrite",options=opts)# Load data from DataOS in Iceberg formattemp_df_part=load(name="dataos://icebase:pyflaresdk/test_write_03",format="iceberg")# Display the total number of recordsprint("Total Records: ",temp_df_part.count())# Filter records where name is 'Alice'f_df=temp_df_part.where("name = 'Alice'")f_df.show(5)# Count the number of records with name 'Alice'alice_records=f_df.count()print("Alice Records: ",alice_records)