Skip to content

Pulsar Depots

Read Config

pulsar_depot_read.yml
version: v1
name: read-pulsar-01
type: workflow
tags:
  - pulsar
  - read
description: this jobs reads data from thirdparty and writes to pulsar
workflow:
  dag:
    - name: read-pulsar
      title: write avro data to pulsar
      description: write avro data to pulsar
      spec:
        tags:
          - Connect
        stack: flare:6.0
        compute: runnable-default
        flare:
          job:
            explain: true
            inputs:
              - name: input
                dataset: dataos://sanitypulsar:default/write_pulsar_12
                isStream: false
                options:
                  startingOffsets: earliest
            logLevel: INFO
            outputs:
              - name: input
                dataset: dataos://lakehouse:smoketest/read_pulsar_12?acl=rw
                format: Iceberg
                options:
                  saveMode: overwrite

Write Config

pulsar_depot_write.yml
# pulsar admin tag required
version: v1
name: write-pulsar-01
type: workflow
tags:
  - pulsar
  - read
description: this jobs reads data from thirdparty and writes to pulsar
workflow:
  dag:
    - name: write-pulsar
      title: write avro data to pulsar
      description: write avro data to pulsar
      spec:
        tags:
          - Connect
        stack: flare:6.0
        compute: runnable-default
        flare:
          job:
            explain: true
            inputs:
              - name: input
                dataset: dataos://thirdparty01:none/city
                format: csv
                schemaPath: dataos://thirdparty01:none/schemas/avsc/city.avsc
            logLevel: INFO
            outputs:
              - name: output
                dataset: dataos://sanitypulsar:default/write_pulsar_12?acl=rw
                format: pulsar
                tags:
                  - Connect
                title: City Data Pulsar
            steps: 
              - sequence: 
                - name: output
                  sql: SELECT *, date_format (now(), 'yyyyMMddHHmm') AS version, now() AS ts_city FROM input
Was this page helpful?