Inputs¶
An input is a source of data piped through an array of optional processors:
input:
label: my_redis_input
redis_streams:
url: tcp://localhost:6379
streams:
- benthos_stream
body_key: body
consumer_group: benthos_group
# Optional list of processing steps
processors:
- mapping: |
root.document = this.without("links")
root.link_count = this.links.length()
Some inputs have a logical end, for example, a csv
input ends once the last row is consumed, when this happens, the input gracefully terminates and Benthos will shut itself down once all messages have been processed fully.
It's also possible to specify a logical end for an input that otherwise doesn't have one with the read_until
input, which checks a condition against each consumed message in order to determine whether it should be the last.
Brokering¶
Only one input is configured at the root of a Benthos config. However, the root input can be a broker which combines multiple inputs and merges the streams:
input:
broker:
inputs:
- kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
- redis_streams:
url: tcp://localhost:6379
streams:
- benthos_stream
body_key: body
consumer_group: benthos_group
Labels¶
Inputs have an optional field label
that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple inputs, otherwise their metrics labels will be generated based on their composition. For more information check out the metrics documentation.
Sequential Reads¶
Sometimes it's useful to consume a sequence of inputs, where an input is only consumed once its predecessor is drained fully, you can achieve this with the sequence
input.
Generating Messages¶
It's possible to generate data with Benthos using the generate
input, which is also a convenient way to trigger scheduled pipelines.
Various Input Sources and Their Categories¶
The section labeled as inputs
consist of settings that enable the retrieval of data from diverse sources such as cloud buckets provided by popular cloud services providers like AWS, GCP, and Azure, APIs, and social media sites, including Twitter and Discord. These sources may be classified into categories such as Services, Networks, etc.
To access specific YAML configurations for each input source, users can refer to the accompanying database that lists the categories for each source. By clicking on the corresponding ‘OPEN’ button within the name section of the input source, a new popup window will appear displaying the relevant YAML configurations and description of each field.
Name | Category |
---|---|
amqp_0_9 | AMQP, Services |
amqp_1 | AMQP, Services |
aws_kinesis | AWS, Services |
aws_s3 | AWS, Services |
aws_sqs | AWS, Services |
azure_blob_storage | Azure, Services |
azure_queue_storage | Azure, Services |
azure_table_storage | Azure, Services |
batched | Utility |
beanstalkd | Services |
broker | Utility |
cassandra | Services |
csv | Local |
discord | Services, Social |
dynamic | Utility |
file | Local |
gcp_bigquery_select | GCP, Services |
gcp_cloud_storage | GCP, Services |
gcp_pubsub | GCP, Services |
generate | Utility |
hdfs | Services |
http_client | Network |
http_server | Network |
inproc | Utility |
kafka | Services |
kafka_franz | Services |
mongodb | Services |
nanomsg | Network |
parquet | Local |
pulsar | Services |
sftp | Network |
socket | Network |
socket_server | Network |
stdin | Local |
twitter_search | Services, Social |
mqtt | Services |