Fetching Data from Stock Data API using Bento¶
Step-by-Step Guide to Fetching Stock Market Data Using Bento¶
This guide outlines the process of fetching stock market data from the Polygon API and integrating it into DataOS using Bento and Pulsar.
1. Create an Account and Obtain an API Key¶
To access stock market data, an API key is required. Follow these steps to obtain one:
- Visit the Polygon API documentation.
- Sign up for an account or log in if an account already exists.
- Navigate to the API key section and generate a new key.
- Store the API key securely, as it is required for authentication in subsequent steps.
2. Ingest Data into DataOS Using Pulsar¶
Apache Pulsar serves as the messaging system for streaming data into DataOS. To ingest stock market data:
- Set up a Pulsar topic dedicated to stock market data ingestion.
- Configure DataOS to subscribe to this topic for real-time data processing.
- Ensure the necessary access credentials and permissions are granted for seamless data ingestion.
3. Fetch Data Using Bento and Write to Pulsar¶
Bento facilitates the retrieval of stock market data and its integration with Pulsar. The process involves:
- Writing a Bento script that interacts with the Polygon API.
- Using the obtained API key to authenticate requests.
- Defining the data schema and transformation logic, if needed.
- Publishing the fetched data to the designated Pulsar topic for further processing in DataOS.
4. Define a YAML Configuration File for Bento¶
A YAML configuration file is required to define the Bento stack, specifying input sources, processing logic, and output destinations. The key sections include:
- Input Section: Configures the API request, specifying parameters such as stock symbols, time intervals, and authentication details.
- Pipeline Section: Defines any necessary data transformations, filtering, or enrichment processes before storage.
-
Output Section: Specifies the Pulsar topic where the processed stock market data will be published.
version: v1beta1 name: pulsar-data-stream type: service tags: - api description: API gateway server service: title: API Data replicas: 1 servicePort: 8098 autoScaling: enabled: true minReplicas: 2 maxReplicas: 4 targetMemoryUtilizationPercentage: 80 targetCPUUtilizationPercentage: 80 ingress: enabled: true path: /stockdatapple noAuthentication: false tags: - wbi - trigger # DataOS env link envs: METIS_REGISTRY_URL: http://metis-api.metis.svc.cluster.local:5000/api/v2 # Assigning the resources resources: requests: cpu: 100m memory: 128Mi limits: cpu: 1000m memory: 1024Mi stack: bento logLevel: DEBUG stackSpec: # Use HTTP server to get the data in the 0.0.0.0:8098/stockdataapple input: label: "" http_server: address: 0.0.0.0:8098 path: /stockdatapple allowed_verbs: - POST - GET timeout: 60s # Use rate limit for pagination purposes rate_limit: "" sync_response: status: ${! meta("http_status_code") } # Pipeline section here we call the API pipeline: processors: - log: level: DEBUG message: "Meta: ${! meta() } Payload: ${! json() }" - http: url: https://api.polygon.io/v2/aggs/ticker/AAPL/range/1/day/2022-03-01/2022-04-02?apiKey=vPN3I7pGcKag2ampTWSVZCwBDD55cVF5 verb: GET headers: Content-Type: application/json rate_limit: "" timeout: 30s parallel: true # Assign a condition here if the HTTP status code is less than 300 and greater than 300, we will get an error notification - switch: - check: meta("http_status_code").number() <= 300 processors: - log: level: DEBUG message: 'Stock Response: ${! json() } Status: ${! meta("http_status_code")}' - check: meta("http_status_code").number() > 300 processors: - log: level
-
Reading data from pulsar and writing to icebase
version: v1 name: pulsar-applestock-data type: workflow tags: - pulsar - read - applestock description: this jobs reads data from pulsar and writes to icebase #ingestion YAML starts workflow: dag: - name: pulsar-appledata title: read avro data from pulsar description: read avro data from pulsar spec: tags: - Connect stack: flare:5.0 # a dataos APIkey is required of operator tag. envs: DATAOS_RUN_AS_APIKEY: dG9rZW5fc29jaWFsbHlfdHlwaWNhbGx5X2dyYXRlZnVsX3NuYWlsLjAyYzhiZWU4LWJkNzctNDQ2Zi1hMzJlLTJhZGNjMjg5OGM3Ng== stackSpec: job: explain: true #enter the name of depo "/stockdatapple" is the pulsar topic name #publicstreams is the depo of pulsar which is created in DataOS inputs: - name: input dataset: dataos://publicstreams:default/stockdatapple options: startingOffsets: earliest isStream: false logLevel: INFO outputs: - name: stockdata depot: dataos://icebase:sample?acl=rw steps: - sink: - sequenceName: input datasetName: stock_pulsar outputName: stockdata outputType: Iceberg description: stockdata data ingested from pulsar outputOptions: saveMode: overwrite iceberg: properties: write.format.default: parquet write.metadata.compression-codec: gzip tags: - Connect title: Apple Stock Data