Skip to content

Query Pushdown Streamlit Application

Code Snippet 1: Migrations for Database

CREATE TABLE IF NOT EXISTS order_summary(
  __metadata jsonb NOT NULL,
  total_products INT NULL,
  total_order_quantity INT NULL,
  total_shipment_cost INT NULL,
  shipment_type varchar(255) NULL,
  order_id varchar(255) NULL,
  order_ts TIMESTAMP WITH TIME ZONE NULL,
  retailer_type varchar(255) NULL,
  retailer_name varchar(255) NULL,
  retailer_chain varchar(255) NULL,
  retailer_state varchar(255) NULL,
  retailer_city varchar(255) NULL,
  created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (NOW() at time zone 'utc'),
  updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (NOW() at time zone 'utc'),
  PRIMARY KEY (order_id)
);

CREATE TABLE IF NOT EXISTS order_summary_search (
  total_products INT NULL,
  total_order_quantity INT NULL,
  total_shipment_cost INT NULL,
  shipment_type varchar(255) NULL,
  order_id varchar(255) NULL,
  order_ts TIMESTAMP WITH TIME ZONE NULL,
  retailer_type varchar(255) NULL,
  retailer_name varchar(255) NULL,
  retailer_chain varchar(255) NULL,
  retailer_state varchar(255) NULL,
  retailer_city varchar(255) NULL,
  document_vectors tsvector NOT NULL,
  cdc_at timestamp without time zone NOT NULL,
  UNIQUE(order_id)
);

CREATE
OR REPLACE FUNCTION update_order_summary_table() RETURNS TRIGGER AS $ update_order_summary_table $ DECLARE new_uuid VARCHAR(255);

BEGIN new_uuid = new.order_id;

BEGIN
INSERT INTO
  order_summary_search (
    total_products,
    total_order_quantity,
    total_shipment_cost,
    shipment_type,
    order_id,
    order_ts,
    retailer_type,
    retailer_name,
    retailer_chain,
    retailer_state,
    retailer_city,
    document_vectors,
    cdc_at
  )
SELECT
  s_view.total_products,
  s_view.total_order_quantity,
  s_view.total_shipment_cost,
  s_view.shipment_type,
  s_view.order_id,
  s_view.order_ts,
  s_view.retailer_type,
  s_view.retailer_name,
  s_view.retailer_chain,
  s_view.retailer_state,
  s_view.retailer_city,
  (
    setweight(to_tsvector('BLANKSEARCHEXPR'), 'A') || setweight(
      to_tsvector(
        coalesce(
          array_to_string(
            regexp_split_to_array(s_view.order_id, '\\\\/|:|\\\\.|\\\\s'),
            ', '
          ),
          ''
        )
      ),
      'A'
    )
  ) AS document_vectors,
  NOW() AS cdc_at
FROM
  (
    SELECT
      total_products,
      total_order_quantity,
      total_shipment_cost,
      shipment_type,
      order_id,
      order_ts,
      retailer_type,
      retailer_name,
      retailer_chain,
      retailer_state,
      retailer_city
    FROM
      order_summary
    WHERE
      order_id = new_uuid
  ) AS s_view ON CONFLICT (order_id) DO
UPDATE
SET
  total_products = excluded.total_products,
  total_order_quantity = excluded.total_order_quantity,
  total_shipment_cost = excluded.total_shipment_cost,
  shipment_type = excluded.shipment_type,
  order_id = excluded.order_id,
  order_ts = excluded.order_ts,
  retailer_type = excluded.retailer_type,
  retailer_name = excluded.retailer_name,
  retailer_chain = excluded.retailer_chain,
  retailer_state = excluded.retailer_state,
  retailer_city = excluded.retailer_city,
  document_vectors = excluded.document_vectors,
  cdc_at = excluded.cdc_at;

RETURN NULL;

END;

END;

$ update_order_summary_table $ LANGUAGE plpgsql;

CREATE CONSTRAINT TRIGGER update_order_summary_table
AFTER
INSERT
  OR
UPDATE
  ON order_summary DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE update_order_summary_table();

CREATE INDEX part_doc_vec_idx ON order_summary_search USING GIN (document_vectors);

Code Snippet 2: Database Primitive/Resource YAML

version: v1
name: ordersumdb
type: database
description: part search database.
tags:
  - database
database:
  migrate:
    includes:
      - /migrations_poc
    command: up

Code Snippet 3: Beacon Service YAML

version: v1
name: order-summary-search
type: service
tags:
  - syndicate
  - service
service:
  replicas: 1
  compute: runnable-default
  ingress:
    enabled: true
    stripPath: true
    path: /order-summary/api/v1
    noAuthentication: true
  stack: beacon+rest
  envs:
    PGRST_OPENAPI_SERVER_PROXY_URI: https://<dataos-context>/order-summary/api/v1/order-summary/api/v1
  beacon:
    source:
      type: database
      name: ordersumdb
      workspace: public
  topology:
  - name: database
    type: input
    doc: order search database connection
  - name: rest-api
    type: output
    doc: serves up the order search database as a RESTFUL API
    dependencies:
      - database

Code Snippet 4: Flare Job YAML

---
version: v1
name: order-summ-data
type: workflow
description: This jobs query lens and write data to postgres table
workflow:
  title: Order Summary
  dag:
    - name: order-summary-data
      title: Order Summary
      spec:
        stack: flare:5.0
        compute: runnable-default
        stackSpec:
          job:
            explain: true
            inputs:
              - name: input_order
                query: |
                  SELECT
                    "product.total_products" as total_products,
                    "order_shipment.total_order_quantity" as total_order_quantity,
                    cast("order_shipment.total_shipment_cost" as int) as total_shipment_cost,
                    "order_shipment.shipment_type" as shipment_type,
                    "order.order_id" as order_id,
                    "order.order_date" as order_ts,
                    "retailer.type" as retailer_type,
                    "retailer.name" as retailer_name,
                    "retailer.chain" as retailer_chain,
                    "retailer.state" as retailer_state,
                    "retailer.city" as retailer_city
                  FROM
                    LENS (
                      SELECT
                        "product.total_products",
                        "order_shipment.total_order_quantity",
                        "order_shipment.total_shipment_cost",
                        "order_shipment.shipment_type",
                        "order.order_id",
                        "order.order_date",
                        "retailer.type",
                        "retailer.name",
                        "retailer.chain",
                        "retailer.state",
                        "retailer.city"
                      FROM
                        supplychain
                    )
                options:
                  SSL: "true"
                  driver: "io.trino.jdbc.TrinoDriver"

            logLevel: INFO

            outputs:
              - name: output01
                depot: dataos://ordersumdbdatabase:public?acl=rw
                driver: org.postgresql.Driver
            steps:
              - sink:
                - sequenceName: input_order
                  datasetName: order_summary
                  outputName: output01
                  outputType: JDBCQuery
                  outputOptions:
                    query:
                      INSERT INTO order_summary (__metadata, total_products, total_order_quantity, total_shipment_cost, shipment_type, order_id, order_ts,  retailer_type, retailer_name, retailer_chain, retailer_state, retailer_city) VALUES ( to_json(?::JSONB), ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?) ON CONFLICT (order_id) DO UPDATE SET __metadata = excluded.__metadata, total_products = excluded.total_products, total_order_quantity = excluded.total_order_quantity, total_shipment_cost = excluded.total_shipment_cost, shipment_type=excluded.shipment_type, order_id = excluded.order_id, order_ts = excluded.order_ts, retailer_type = excluded.retailer_type, retailer_name= excluded.retailer_name, retailer_chain= excluded.retailer_chain, retailer_state= excluded.retailer_state, retailer_city= excluded.retailer_city

https://touched-rattler.dataos.app/order-summary/api/v1/order_summary_search

https://touched-rattler.dataos.app/order-summary/api/v1/order_summary_search?document_vectors=phfts(english).{0}

Code Snippet 5: Streamlit Application Code

import webbrowser

import requests
import streamlit as st
from streamlit.components.v1 import components

api_url = "https://touched-rattler.dataos.app/order-summary/api/v1/order_summary_search?document_vectors=phfts(english).{0}"

st.markdown("<h1 style='text-align: center; color: black;'>Order 360</h1>", unsafe_allow_html=True)

st.write("\n")
st.write("\n")
st.markdown("**Please Enter Order Id: (Sample Order ID ORD-5000, ORD-5001, ORD-5002...)**")
order_id = st.text_input("", value="")

if len(order_id) < 8:
    st.write("Please enter valid order id")
else:
    st.write("API Call: {}".format(api_url.format(order_id)))
    try:
        data = requests.get(api_url.format(order_id)).json()
        if len(data) == 0:
            st.write("Please enter valid order id")
        else:
            st.subheader("*Order Information*")

            col1, col2, col3 = st.columns(3)
            with col1:
                st.markdown("***Order Id:***")
                st.write(data[0]['order_id'])

            with col3:
                st.markdown("***Timestamp:***")
                st.write(data[0]['order_ts'])

            st.markdown("""-------------""")

            col1, col2, col3 = st.columns(3)

            with col1:
                st.markdown("***Total Products:***")
                st.write("{0}".format(data[0]['total_products']))

            with col3:
                st.markdown("***Total Quantities:***")
                st.write("{0}".format(data[0]['total_order_quantity']))

            st.markdown("""-------------""")

            st.subheader("*Customer Information*")

            col1, col2, col3 = st.columns(3)
            with col1:
                st.markdown("***Name:***")
                st.write(data[0]['retailer_name'])
            with col2:
                st.markdown("***Type:***")
                st.write(data[0]['retailer_type'])
            with col3:
                st.markdown("***Chain:***")
                st.write(data[0]['retailer_chain'])

            st.markdown("""-------------""")

            col1, col2, col3 = st.columns(3)
            with col1:
                st.markdown("***State:***")
                st.write(data[0]['retailer_state'])

            with col2:
                st.markdown("***City:***")
                st.write(data[0]['retailer_city'])

            st.markdown("""-------------""")

            st.subheader("*Shipment Information*")
            col1, col2, col3 = st.columns(3)
            with col1:
                st.markdown("***Shipment Type:***")
                st.write(data[0]['shipment_type'])
            with col3:
                st.markdown("***Shipment Cost:***")
                st.write("{0}".format(data[0]['total_shipment_cost']))

    except:
        print()

Code Snippet 6: Docker File

FROM python:3.7.6
WORKDIR /app
COPY requirements.txt ./requirements.txt
RUN pip3 install -r requirements.txt
COPY . .
CMD streamlit run decision-product.py --server.port 8501 --server.address "0.0.0.0" --server.baseUrlPath /decission_product

Code Snippet 7: Requirements File

altair==4.2.0
attrs==22.1.0
backports.zoneinfo==0.2.1
blinker==1.5
cachetools==5.2.0
certifi==2022.9.24
charset-normalizer==2.1.1
click==8.1.3
commonmark==0.9.1
cycler==0.11.0
decision==0.3.1
decorator==5.1.1
entrypoints==0.4
fonttools==4.37.3
gitdb==4.0.9
GitPython==3.1.27
graphviz==0.20.1
idna==3.4
importlib-metadata==4.12.0
importlib-resources==5.9.0
Jinja2==3.1.2
jsonschema==4.16.0
kiwisolver==1.4.4
MarkupSafe==2.1.1
matplotlib==3.5.3
numpy==1.21.6
packaging==21.3
pandas==1.3.5
Pillow==9.2.0
pkgutil-resolve-name==1.3.10
protobuf==3.20.1
pyarrow==9.0.0
pydeck==0.8.0b3
Pygments==2.13.0
Pympler==1.0.1
pyparsing==3.0.9
pyrsistent==0.18.1
python-dateutil==2.8.2
pytz==2022.2.1
pytz-deprecation-shim==0.1.0.post0
requests==2.28.1
rich==12.5.1
seaborn==0.12.0
semver==2.13.0
six==1.16.0
smmap==5.0.0
streamlit==1.13.0
toml==0.10.2
toolz==0.12.0
tornado==6.2
trino==0.316.0
typing-extensions==4.3.0
tzdata==2022.4
tzlocal==4.2
urllib3==1.26.12
validators==0.20.0
zipp==3.8.1

Example:

https://touched-rattler.dataos.app/decision_product/