How to read write partitioned data within Icebase using DataOS PyFlare?¶
# Import necessary librariesfrompyspark.sqlimportSparkSessionfrompyspark.sqlimportRowimportrandomfromdatetimeimportdatetime,timedeltafrompyflare.sdkimportload,save,session_builder# Define Spark configurationsparkConf=[("spark.app.name","Iceberg Read Write Partitioned Data"),# Set the Spark application name("spark.master","local[*]"),# Use local mode for Spark execution]# DataOS configurationDATAOS_FQDN="{{dataos-fqdn}}"token="{{dataos-apikey-token}}"# Create a Spark session with DataOS settingsspark=session_builder.SparkSessionBuilder(log_level="INFO") \
.with_spark_conf(sparkConf) \
.with_user_apikey(token) \
.with_dataos_fqdn(DATAOS_FQDN) \
.with_depot(depot_name="icebase",acl="rw") \
.build_session()# Generate random data and create a DataFramedefGenerateData():num_rows=random.randint(1000,10000)# Randomly determine the number of rowsfirst_names=["Alice","Bob","Charlie","David","Emily","Frank","Grace","Henry","Ivy","Jack","Karen","Leo","Mia","Noah","Olivia","Peter","Quinn","Rachel","Sam","Tom","Uma","Victor","Wendy","Xander","Yvonne","Zane"]data=[]for_inrange(num_rows):name=random.choice(first_names)date_of_birth=generate_random_date_of_birth()age=random.randint(20,99)data.append(Row(name=name,date_of_birth=date_of_birth,age=age))returndata# Function to generate a random date of birth within a specified rangedefgenerate_random_date_of_birth():start_date=datetime(1998,1,1)end_date=datetime(1998,12,31)random_days=random.randint(0,(end_date-start_date).days)return(start_date+timedelta(days=random_days)).strftime("%Y-%m-%d")# Create DataFrame with generated data and repartition itdf=spark.createDataFrame(GenerateData()).repartition(1)# Display the first 10 recordsdf.show(10)# Ignore warning for "The specified path does not exist."# Define partitioning options for Iceberg formatopts={"fanout-enabled":"true","iceberg":{"partition":[{"type":"bucket","column":"age","bucket_count":2},{"type":"identity","column":"name"},{"type":"year","column":"date_of_birth"}]}}# Save DataFrame to DataOS in Iceberg format with partitioningsave(name="dataos://icebase:pyflaresdk/test_write_02",dataframe=df,format="iceberg",mode="overwrite",options=opts)# Load and display data from DataOS in Iceberg formatload(name="dataos://icebase:pyflaresdk/test_write_02",format="iceberg").show()# Count the total number of records in the stored datasetload(name="dataos://icebase:pyflaresdk/test_write_02",format="iceberg").count()# Uncomment the following line if you wish to stop the Spark session# spark.stop()