Benthos on DataOS¶
When dealing with never-ending streams of data within DataOS - for e.g. IoT, real-time stock trades, server logs, and event data - the Service resource is the answer to your prayers. But let's face it, stream data can be a tough nut to crack, and that's where Benthos comes in as your trusty sidekick. It's the perfect stack to help you process your stream data quickly and efficiently without breaking a sweat.
With Benthos Service, you can enjoy the best of both worlds: the inherent benefits of the Service resource, which effortlessly enables you to create scalable and robust pipelines enriched with features such as built-in orchestration, cataloging, and governance capabilities, while the Benthos stack takes care of all the heavy lifting when it comes to your stream data.
And the best part? You can do it all with ease thanks to YAML declarative programming, which lets you focus on what really matters - processing your data and getting those precious insights - rather than worrying about the nitty-gritty details of how to fit the pieces of the puzzle together.
Prerequisites¶
Get the DataOS API Key¶
For writing data to Fastbase depots, you must obtain the user API key. Execute the following command to retrieve it:
Obtain the Pulsar-admin tag¶
In order to work with Pulsar format, you will require the pulsar-admin tag to write data to the DataOS Pulsar environment. To check your available tags, execute the following command:
dataos-ctl user get
# Output
NAME | ID | TYPE | EMAIL | TAGS
---------------|-------------|--------|----------------------|---------------------------------
IamGroot | iamgroot | person | iamgroot@tmdc.io | roles:direct:collated,
| | | | roles:id:data-dev,
| | | | roles:id:depot-manager,
| | | | roles:id:depot-reader,
| | | | roles:id:pulsar-admin, # required tag
| | | | roles:id:system-dev,
| | | | roles:id:user,
| | | | users:id:iamgroot
Let’s Begin!¶
As we commence our exciting new venture, we will be acquiring the data from the Random User API, a remarkably user-friendly and easily accessible API. To transform this API data, we will be leveraging the capabilities of the Benthos stack by applying some Bloblang transformation, followed by writing the data to the DataOS Fastbase depot. So let's dive in and get the show on the road, shall we?
Create a Benthos Service YAML¶
Configure the Service resource Section¶
At the core of any Benthos Service lies the Service resource section, which is responsible for defining a Service resource through a set of YAML fields and configurations. A Service is a persistent process that either receives or delivers API requests. The Benthos stack is then invoked within the Service to effectuate the requisite transformations.
version: v1
name: ${{randomusertest}}
type: service
service:
compute: ${{runnable-default}}
replicas: ${{1}}
servicePort: ${{8080}} # dataos port
ingress:
enabled: ${{true}}
path: ${{/test007}} # url path
noAuthentication: ${{true}}
annotations:
konghq.com/strip-path: "false"
kubernetes.io/ingress.class: kong
stack: ${{benthos}} # dataos stack
logLevel: ${{DEBUG}}
tags:
- ${{service}}
For a deeper understanding of Service and its associated YAML configurations, please refer to the Service page.
Configuring Benthos Stack-specific Section¶
To configure the Benthos Stack-specific section, you need to configure several components within the YAML configuration file. Here's an example of how you can structure the YAML configuration:
A sample YAML Configuration is provided below:
benthos:
input:
${{input-component}}
pipeline:
processors:
${{pipeline-component}}
output:
${{output-component}}
Let's break down each component:
Input Component¶
The input
component specifies the configuration for the data source. Here's an example configuration for the http_client
input:
http_client:
url: ${{https://randomuser.me/api/}}
verb: ${{GET}}
headers:
Content-Type: ${{application/JSON}}
You can refer to the http_client
input documentation for more details on available configuration options.
Pipeline Component¶
The pipeline
component is responsible for applying a series of processors to the data stream. Processors enable you to manipulate, transform, or enrich the data as it flows through the pipeline. Here's an example of a pipeline configuration using the bloblang processor:
pipeline:
processors:
- label: my_blobl
bloblang: |
page = this.info.page
age = this.results.0.dob.age
dob = this.results.0.dob.date
page = this.info.page
seed = this.info.seed
email = this.results.0.email
gender = this.results.0.gender
name = this.results.0.id.name
city = this.results.0.location.city
bloblang
processor is applied to the data stream. It extracts specific fields from the input data and assigns them to variables for further processing. Feel free to explore the available processors and their configurations in the Benthos Processors documentation.
Output Component¶
The output
component determines where the processed data will be sent after passing through the pipeline. You can configure various output options such as writing to files, sending messages to message queues, or interacting with external APIs. Here's an example configuration for Fastbase depot:
output:
broker:
outputs:
- broker:
pattern: fan_out
outputs:
- plugin:
address: ${{dataos://fastbase:default/test007}}
metadata:
auth:
token:
enabled: true
token: ${{DataOS-User-API-Key}}
description: ${{Random users data}}
format: AVRO
schema: ${{"{\"name\":\"default\",\"type\":\"record\",\"namespace\":\"defaultNamespace\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"dob\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"gender\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"int\"},{\"name\":\"seed\",\"type\":\"string\"}]}"}}
schemaLocation: http://registry.url/schemas/ids/12
title: ${{Random Uses Info}}
type: STREAM
type: dataos_depot
- stdout: {}
In this example, the output
is configured to use the broker
pattern with a fan_out
strategy. The data will be sent to both the dataos_depot
plugin, specifically to the fastbase:default/test007
address, and the stdout
output for logging purposes.
Feel free to explore the available output options and their configurations in the Benthos outputs documentation.
Apply the YAML file¶
You can apply the YAML file, to create a Service resource within the DataOS environment using the command given below:
Check Topic Consume in Fastbase Depot¶
Check Run time¶
dataos-ctl -t service -w ${{workspace}} -n ${{service-name}} get runtime -r
# Sample
dataos-ctl -t service -w public -n pulsar-random get runtime -r
List all tenants¶
List all Namespaces within the Public Tenant¶
List all topics in public/default namespace¶
dataos-ctl fastbase topic list -n ${{namespace}}
# Sample
dataos-ctl fastbase topic list -n public/default