Skip to content

Concurrent writes

This case scenario tests the situation where multiple jobs are being concurrently written at the same location.

Solution approach

This scenario works only for Iceberg as it supports multiple concurrent writes. A workflow with two jobs is defined to write at the same data location.

Implementation details

This case scenario is tested on NY-Taxi data. A workflow is created with two jobs (in one dag) to perform write operations for different vendors (data was filtered on vendor level). The workflow is to be submitted for both modes:

append and save mode

For append mode, the data should be written from both jobs.

overwrite and save mode

When Flare's overwrite mode is dynamic, partitions that have rows produced by the jobs will be replaced on every new write operation. Only the last finished job's data should be seen in this case.

Outcomes

Queries were run to fetch data to validate the expected behavior. Also, it was possible to query data while data was being written.

Job to test concurrent writes

# This contains two jobs and save mode as append
version: v1beta1
name: workflow-ny-taxi-parallel-write
type: workflow
tags:
- Connect
- NY-Taxi
description: The job ingests NY-Taxi data small files and combined them to one file
workflow:
  title: Connect NY Taxi
  dag:
  - name: nytaxi-vendor-one
    title: NY-taxi data ingester-parallel 
    description: The job ingests NY-Taxi data from dropzone into raw zone
    spec:
      tags:
      - Connect
      - NY-Taxi
      stack: flare:5.0
      stackSpec:
        job:
          explain: true
          inputs:
           - name: ny_taxi
             dataset: dataos://thirdparty01:none/ny-taxi-data?acl=r
             format: json
             isStream: false

          logLevel: INFO
          outputs:
            - name: output01
              depot: dataos://icebase:raw01?acl=rw
          steps:
          - sink:
             - sequenceName: ny_taxi_ts
               datasetName: ny_taxi_04
               outputName:  output01
               outputType: Iceberg
               outputOptions:
                 saveMode: append
                 iceberg:
                  properties:
                      write.format.default: parquet
                      write.metadata.compression-codec: gzip
                      #overwrite-mode: dynamic # this was used only when one partition data is need to be replaced with saveMode as Overwrite that job was seperate if need will send that as well
                  partitionSpec:
                    - type: month # identity partitioning was used at vendor_id level
                      column: date_col # col name = vendor_id
                      name: month

               tags:
                  - Connect
                  - NY-Taxi
               title: NY-Taxi Data Partitioned

            sequence:
              - name: ny_taxi_changed_dateformat
                sql: select *, to_timestamp(pickup_datetime/1000) as date_col from ny_taxi where vendor_id = 1

              - name: ny_taxi_ts
                sql: SELECT *, date_format(now(), 'yyyyMMddHHmm') as version, now() as
                  ts_ny_taxi FROM ny_taxi_changed_dateformat

  - name: nytaxi-vendor-two
    title: NY-taxi data ingester-parallel
    description: The job ingests NY-Taxi data from dropzone into raw zone
    spec:
      tags:
      - Connect
      - NY-Taxi
      stack: flare:5.0
      stackSpec:
        job:
          explain: true
          inputs:
           - name: ny_taxi
             dataset: dataos://thirdparty01:none/ny-taxi-data?acl=r
             format: json
             isStream: false

          logLevel: INFO
          outputs:
            - name: output02
              depot: dataos://icebase:raw01?acl=rw
          steps:
          - sink:
             - sequenceName: ny_taxi_ts
               datasetName: ny_taxi_04
               outputName:  output02
               outputType: Iceberg
               outputOptions:
                 saveMode: append
                 iceberg:
                  properties:
                      write.format.default: parquet
                      write.metadata.compression-codec: gzip
                  partitionSpec:
                    - type: month
                      column: date_col
                      name: month

               tags:
                  - Connect
                  - NY-Taxi
               title: NY-Taxi Data Partitioned

            sequence:
              - name: ny_taxi_changed_dateformat
                sql: select *, to_timestamp(pickup_datetime/1000) as date_col from ny_taxi where vendor_id = 2

              - name: ny_taxi_ts
                sql: SELECT *, date_format(now(), 'yyyyMMddHHmm') as version, now() as
                  ts_ny_taxi FROM ny_taxi_changed_dateformat
Was this page helpful?