Skip to main content

Stocks Tracker

In this example, we use DBOS to build and deploy an app that tracks stock prices and send SMS alerts when the price of a stock crosses a certain threshold. We certainly don't want to miss a good opportunity to trade, so it's important this workflow is punctual and reliable!

You can see the application live here. All source code is available on GitHub.

Import and Initialize the App

Let's start off by importing the necessary libraries and initializing the app.

import os
import datetime
import threading
import yfinance as yf
from twilio.rest import Client
from dbos import DBOS
from schema import stock_prices, alerts

DBOS()

Stock Prices Watcher as an online cron job

We'll use a DBOS Scheduled Workflow to make this job punctual and reliable. Under the hood, DBOS leverages Postgres to store the job's execution status and schedule, ensuring that the job is executed even if the server is restarted.

@DBOS.scheduled('* * * * *')
@DBOS.workflow()
def fetch_stock_prices_workflow(scheduled_time: datetime, actual_time: datetime):
# For simplicity, let's declare inline a list of stock symbols to track
symbols = ['AAPL', 'GOOGL', 'AMZN', 'MSFT', 'TSLA', 'NVDA']
# Fetch registered alerts
registered_alerts = fetch_alerts()
# Fetch stock prices for each symbol
for symbol in symbols:
price = fetch_stock_price(symbol)
save_to_db(symbol, price)
# If there is a registered alert for that symbol, send a SMS if the price is above the alert threshold
if registered_alerts and symbol in registered_alerts:
if price > registered_alerts[symbol].price_threshold:
send_sms_alert(symbol, price, registered_alerts[symbol].phone_number)

Fetching stock prices

We'll use the yfinance library to fetch stock prices and write a DBOS step to make it durable.

@DBOS.step()
def fetch_stock_price(symbol):
stock = yf.Ticker(symbol)
data = stock.history(period="1d")
if data.empty:
raise ValueError("No stock data found for the symbol.")
DBOS.logger.info(f"Stock price for {symbol} is {data['Close'].iloc[0]}")
return data['Close'].iloc[0]

Sending SMS alerts

We'll use the Twilio API to send SMS alerts and write a DBOS step to make it durable. You can sign up for a free Twilio account at https://www.twilio.com/try-twilio. We'll need a few environment variables to store our Twilio account SID, auth token and phone number. The alerts themselves are configured on the app's streamlit frontend.

twilio_account_sid = os.environ.get('TWILIO_ACCOUNT_SID')
twilio_auth_token = os.environ.get('TWILIO_AUTH_TOKEN')
twilio_phone_number = os.environ.get('TWILIO_PHONE_NUMBER')
@DBOS.step()
def send_sms_alert(symbol, price, to_phone_number):
client = Client(twilio_account_sid, twilio_auth_token)
message = client.messages.create(
body=f"{symbol} stock price is {price}.",
from_=twilio_phone_number,
to=to_phone_number
)
DBOS.logger.info(f"SMS sent: {message.sid}")

Saving stock prices to the database and fetching registered alerts

Let's write two small functions to retrieve registered alerts and save stock prices to the database. We'll use DBOS Transactions to make these steps durable and ensure they happen exactly once.

@DBOS.transaction()
def save_to_db(symbol, price):
DBOS.sql_session.execute(stock_prices.insert().values(stock_symbol=symbol, stock_price=price))

@DBOS.transaction()
def fetch_alerts():
query = alerts.select()
return {alert.stock_symbol:alert for alert in DBOS.sql_session.execute(query).fetchall()}

Initializing the app

Finally, let's initialize the app. We'll launch DBOS and have the main thread sleep forever while the background threads run.

if __name__ == "__main__":
DBOS.launch()
threading.Event().wait()