Skip to content

Kafka Depots

To execute Flare Jobs on top of object storage depots, you first need to create a depot. If you have already created a depot, then continue reading else proceed to following link: Kafka Depot

To run a Flare Job all you need is the UDL address of the input or output dataset for the reading and writing scenarios respectively. Apart from this you also need the file format of the data and some additional properties

Read Config

For reading the data, we need to configure the name, dataset, and format properties in the inputs section of the YAML. Along with this there are some additional properties which are to be mentioned in the options section. For instance, if your dataset name is sample_data, UDL address is dataos://kafka1:default/random_users_ktok03 and the file format is KafkaAvro. Then the inputs section will be as follows-

inputs:
  - name: sample_data
    dataset: dataos://kafka1:default/random_users_ktok03
    format: KafkaAvro
    isStream: false
    options:
      startingOffsets: earliest
      kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="0b9c4dd98ca9cc944160";
      kafka.sasl.mechanism: PLAIN
      kafka.security.protocol: SASL_PLAINTEXT

Sample Read configuration YAML

Let’s take a case scenario where the dataset is stored in Kafka Depot and you have to read it from the source, perform some transformation steps and write it to the Icebase which is a managed depot within the DataOS. The read config YAML will be as follows

kafa_depot_read.yml
version: v1
name: read-kafka-01
type: workflow
tags:
  - read-kafka
description: this jobs reads data from kafka
workflow:
  dag:
    - name: sample-read-kafka-01
      title: read kafka sample data
      description: This job ingest cloudevent data
      spec:
        stack: flare:5.0
        compute: runnable-default
        stackSpec:
          job:
            explain: true
            logLevel: INFO
              #triggerDuration: "20 seconds"
            inputs:
              - name: sample_data
                dataset: dataos://kafka1:default/random_users_ktok03
                format: KafkaAvro
                isStream: false
                options:
                  startingOffsets: earliest
                  kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="0b9c4dd98ca9cc944160";
                  kafka.sasl.mechanism: PLAIN
                  kafka.security.protocol: SASL_PLAINTEXT
            outputs:
              - name: finalDf
                dataset: dataos://icebase:sample_datasets/sample_dataset_kafkaavro?acl=rw
                format: Iceberg
                options:
                  saveMode: overwrite
                  iceberg:
                    properties:
                      write.metadata.previous-versions-max: "10"
                      history.expire.max-snapshot-age-ms: "7200000"
            steps:
              - sequence:
                - name: finalDf
                  sql: SELECT * FROM sample_data

    - name: dataos-tool-read-sample-dataset
      spec:
        stack: toolbox
        compute: runnable-default
        stackSpec:
          dataset: dataos://icebase:sample_datasets/sample_dataset?acl=rw
          action:
            name: set_version
            value: latest

Write Config

For writing the data to a depot on an object store, we need to configure the name, dataset and format properties in the outputs section of the YAML. For instance, if your dataset is to be stored at the UDL address is dataos://kafka:default?acl=rw by the name output01 and the file format is KafkaJson. Then the inputs section will be as follows

outputs:
  - name: output01
    depot: dataos://kafka:default?acl=rw
    checkpointLocation: dataos://icebase:sys01/checkpoints/ny_taxi/output01/nyt01?acl=rw

Sample Write configuration YAML

Let’s take a case scenario where the output dataset is to be stored a Kafka Depot and you have to read data from the Icebase depot within the DataOS The write config YAML will be as follows

kafa_depot_write.yml
version: v1
name: write-kafka-2
type: workflow
tags:
  - NY-Taxi
description: The job ingests NY-Taxi data small files and conbined them to one file
workflow:
  dag:
    - name: kafka-json-write
      title: write in kafka from json
      description: write to kafka json file format
      spec:
        tags:
          - Connect
        stack: flare:5.0
        compute: runnable-default
        envs:
          ENABLE_TOPIC_CREATION: "true"
        stackSpec:
          job:
            explain: true
            inputs:
              - name: sample_data
                dataset: dataos://thirdparty01:sampledata/avro
                format: avro
                isStream: false
            logLevel: INFO
            outputs:
              - name: output01
                depot: dataos://kafka:default?acl=rw
                checkpointLocation: dataos://icebase:sys01/checkpoints/ny_taxi/output01/nyt01?acl=rw
                format: KafkaJson
            steps:
              - sequence: 
                - name: sample_data
                  sql: SELECT * FROM avro