Skip to content

Inputs

An input is a source of data piped through an array of optional processors:

input:
  label: my_redis_input

  redis_streams:
    url: tcp://localhost:6379
    streams:
      - benthos_stream
    body_key: body
    consumer_group: benthos_group

  # Optional list of processing steps
  processors:
   - mapping: |
       root.document = this.without("links")
       root.link_count = this.links.length()

Some inputs have a logical end, for example, a csv input ends once the last row is consumed, when this happens, the input gracefully terminates and Benthos will shut itself down once all messages have been processed fully.

It's also possible to specify a logical end for an input that otherwise doesn't have one with the read_until input, which checks a condition against each consumed message in order to determine whether it should be the last.

Brokering

Only one input is configured at the root of a Benthos config. However, the root input can be a broker which combines multiple inputs and merges the streams:

input:
  broker:
    inputs:
      - kafka:
          addresses: [ TODO ]
          topics: [ foo, bar ]
          consumer_group: foogroup

      - redis_streams:
          url: tcp://localhost:6379
          streams:
            - benthos_stream
          body_key: body
          consumer_group: benthos_group

Labels

Inputs have an optional field label that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple inputs, otherwise their metrics labels will be generated based on their composition. For more information check out the metrics documentation.

Sequential Reads

Sometimes it's useful to consume a sequence of inputs, where an input is only consumed once its predecessor is drained fully, you can achieve this with the sequence input.

Generating Messages

It's possible to generate data with Benthos using the generate input, which is also a convenient way to trigger scheduled pipelines.

Various Input Sources and Their Categories

The section labeled as inputs consist of settings that enable the retrieval of data from diverse sources such as cloud buckets provided by popular cloud services providers like AWS, GCP, and Azure, APIs, and social media sites, including Twitter and Discord. These sources may be classified into categories such as Services, Networks, etc.

To access specific YAML configurations for each input source, users can refer to the accompanying database that lists the categories for each source. By clicking on the corresponding ‘OPEN’ button within the name section of the input source, a new popup window will appear displaying the relevant YAML configurations and description of each field.

Name Category
amqp_0_9 AMQP, Services
amqp_1 AMQP, Services
aws_kinesis AWS, Services
aws_s3 AWS, Services
aws_sqs AWS, Services
azure_blob_storage Azure, Services
azure_queue_storage Azure, Services
azure_table_storage Azure, Services
batched Utility
beanstalkd Services
broker Utility
cassandra Services
csv Local
discord Services, Social
dynamic Utility
file Local
gcp_bigquery_select GCP, Services
gcp_cloud_storage GCP, Services
gcp_pubsub GCP, Services
generate Utility
hdfs Services
http_client Network
http_server Network
inproc Utility
kafka Services
kafka_franz Services
mongodb Services
nanomsg Network
parquet Local
pulsar Services
sftp Network
socket Network
socket_server Network
stdin Local
twitter_search Services, Social
mqtt Services