Document Detective
In this example, you'll learn how to use DBOS and LlamaIndex to build and serverlessly deploy a chat agent that can index PDF documents and answer questions about them.
This example highlights how to use DBOS to build a reliable data processing pipeline that can run many tasks concurrently, in this case performing parallel ingestion of many documents.
For example, here's what the app looks like after ingesting the last three years of Apple 10-K filings. It can accurately answer questions about Apple's financials:
All source code is available on GitHub.
Import and Initialize the App
Let's start off with imports and initializing DBOS.
import os
from tempfile import TemporaryDirectory
from typing import List
import requests
from dbos import DBOS, Queue, WorkflowHandle, load_config
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from llama_index.core import Settings, StorageContext, VectorStoreIndex
from llama_index.readers.file import PDFReader
from llama_index.vector_stores.postgres import PGVectorStore
from pydantic import BaseModel, HttpUrl
from .schema import chat_history
app = FastAPI()
DBOS(fastapi=app)
Next, let's initialize LlamaIndex to use Postgres with pgvector as its vector store:
def configure_index():
Settings.chunk_size = 512
dbos_config = load_config()
db = dbos_config["database"]
vector_store = PGVectorStore.from_params(
database=db["app_db_name"],
host=db["hostname"],
password=db["password"],
port=db["port"],
user=db["username"],
perform_setup=False,
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex([], storage_context=storage_context)
chat_engine = index.as_chat_engine()
return index, chat_engine
index, chat_engine = configure_index()
Building a Durable Data Ingestion Pipeline
Now, let's write the document ingestion pipeline. Because ingesting and indexing documents may take a long time, we need to build a pipeline that's both concurrent and reliable. It needs to process multiple documents at once and it needs to be resilient to failures, so if the application is interrupted or restarted, or encounters an error, it can recover from where it left off instead of restarting from the beginning or losing some documents entirely.
We'll build a concurrent, reliable data ingestion pipeline using DBOS queues and durable execution. This workflow takes in a batch of document URLs and enqueues them for ingestion. It then waits for them all to complete and counts how many total documents and pages were ingested. If it's ever interrupted or restarted, it recovers the ingestion of each document from the last completed step, guaranteeing that every document is ingested and none are lost.
queue = Queue("indexing_queue")
@DBOS.workflow()
def indexing_workflow(urls: List[HttpUrl]):
handles: List[WorkflowHandle] = []
for url in urls:
handle = queue.enqueue(index_document, url)
handles.append(handle)
indexed_pages = 0
for handle in handles:
indexed_pages += handle.get_result()
DBOS.logger.info(f"Indexed {len(urls)} documents totaling {indexed_pages} pages")
Now, let's write the key step of this pipeline: the function that ingests a PDF document from a URL. This function downloads a document, scans it into pages, then uses LlamaIndex to embed it and store the embedding in Postgres.
We annotate this function with @DBOS.step()
to mark it as a step in our indexing workflow.
Additionally, in case of transient failures (for example in downloading the document) we set it to automatically retry up to 5 times with exponential backoff.
@DBOS.step(retries_allowed=True, max_attempts=5)
def index_document(document_url: HttpUrl) -> int:
with TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, "file.pdf")
with open(temp_file_path, "wb") as temp_file:
with requests.get(document_url, stream=True) as r:
r.raise_for_status()
for page in r.iter_content(chunk_size=8192):
temp_file.write(page)
temp_file.seek(0)
reader = PDFReader()
pages = reader.load_data(temp_file_path)
for page in pages:
index.insert(page)
return len(pages)
Next, let's write the endpoint for indexing. It starts the indexing workflow in the background on a batch of documents.
class URLList(BaseModel):
urls: List[HttpUrl]
@app.post("/index")
async def index_endpoint(urls: URLList):
DBOS.start_workflow(indexing_workflow, urls.urls)
Chatting With Your Data
Now, let's build the backend for a chatbot agent you can use to ask questions about the documents you've ingested.
Each time we get a chat message, we call this workflow with three steps:
- Store the incoming chat message in Postgres.
- Query LlamaIndex to respond to the message using RAG.
- Store the response in Postgres.
class ChatSchema(BaseModel):
message: str
@app.post("/chat")
@DBOS.workflow()
def chat_workflow(chat: ChatSchema):
insert_chat(chat.message, True)
response = query_model(chat.message)
insert_chat(response, False)
return {"content": response, "isUser": True}
@DBOS.transaction()
def insert_chat(content: str, is_user: bool):
DBOS.sql_session.execute(
chat_history.insert().values(content=content, is_user=is_user)
)
@DBOS.step()
def query_model(message: str) -> str:
return str(chat_engine.chat(message))
Let's also write a history endpoint that retrieves all past chats from the database.
This function is called when we open up the chatbot so it can display your chat history.
@app.get("/history")
def history_endpoint():
return get_chats()
@DBOS.transaction()
def get_chats():
stmt = chat_history.select().order_by(chat_history.c.created_at.asc())
result = DBOS.sql_session.execute(stmt)
return [{"content": row.content, "isUser": row.is_user} for row in result]
Finally, let's serve the app's frontend from an HTML file using FastAPI. In production, we recommend using DBOS primarily for the backend, with your frontend deployed elsewhere.
@app.get("/")
def frontend():
with open(os.path.join("html", "app.html")) as file:
html = file.read()
return HTMLResponse(html)
Try it Yourself!
Creating an OpenAI Account
To run this app, you need an OpenAI developer account.
Obtain an API key here and set up a payment method for your account here.
This bot uses gpt-3.5-turbo
for text generation.
Make sure you have some credits (~$1) to use it.
Set your API key as an environment variable:
export OPENAI_API_KEY=<your_openai_key>
Deploying to the Cloud
To deploy this app to DBOS Cloud, first install the DBOS Cloud CLI (requires Node):
npm i -g @dbos-inc/dbos-cloud
Then clone the dbos-demo-apps repository and deploy:
git clone https://github.com/dbos-inc/dbos-demo-apps.git
cd python/document-detective
dbos-cloud app deploy
This command outputs a URL—visit it to see your chat agent! You can also visit the DBOS Cloud Console to see your app's status and logs.
Running Locally
First, clone and enter the dbos-demo-apps repository:
git clone https://github.com/dbos-inc/dbos-demo-apps.git
cd python/document-detective
Then create a virtual environment:
python3 -m venv .venv
source .venv/bin/activate
DBOS requires a Postgres database. If you don't already have one, you can start one with Docker:
export PGPASSWORD=dbos
python3 start_postgres_docker.py
Then run the app in the virtual environment:
pip install -r requirements.txt
dbos migrate
dbos start
Visit http://localhost:8000
to see your chat agent!
Indexing Documents
To index a batch of PDF documents, send a list of their URLs in a POST request to the /index
endpoint.
For example, try this cURL command to index Apple's SEC 10-K filings for 2021, 2022, and 2023
The application URL you should use is http://localhost:8000
locally and your app URL in DBOS Cloud:
curl -X POST "http://<URL>/index" \
-H "Content-Type: application/json" \
-d '{"urls": ["https://d18rn0p25nwr6d.cloudfront.net/CIK-0000320193/faab4555-c69b-438a-aaf7-e09305f87ca3.pdf", "https://d18rn0p25nwr6d.cloudfront.net/CIK-0000320193/b4266e40-1de6-4a34-9dfb-8632b8bd57e0.pdf", "https://d18rn0p25nwr6d.cloudfront.net/CIK-0000320193/42ede86f-6518-450f-bc88-60211bf39c6d.pdf"]}'