Transactions & Datasources
Introduction To Transactions And Datasources
DBOS workflows reliably execute a sequence of steps. A DBOS transaction is a special kind of step that runs a function within a database transaction and records its return value within that transaction. This guarantees that the transaction is executed exactly once, even in the face of retries or failures.
Different JavaScript libraries have different mechanisms for starting transactions and accessing the database. The DBOS datasource layer provides plugin packages for popular libraries, offering:
- Native access to each library’s database APIs
- A consistent transaction control interface
- Compile-time type safety
- Correct runtime behavior across all supported environments
Setting Up Data Sources
Each data source is implemented in a separate package. This package, along with its underlying database libraries, should be installed before use.
- @dbos-inc/drizzle-datasource: drizzle
- @dbos-inc/knex-datasource: Knex.js
- @dbos-inc/nodepg-datasource: node-postgres
- @dbos-inc/postgres-datasource: Postgres.js
- @dbos-inc/prisma-datasource: Prisma
- @dbos-inc/typeorm-datasource: TypeORM
Instantiating Datasources
In general, datasources should be configured and constructed during program load, noting that they will not open any database connections until later in the initialization process. This allows the datasource objects to be used to register transaction functions.
Configuration details vary slightly depending on the package used; Knex is shown in the example below:
class KnexDataSource {
/**
* @param name - A unique name for the datasource.
* @param config - A Knex configuration for the datasource. Passed directly into the Knex pool object.
*/
constructor(name: string, config: Knex.Config)
}
const config = {client: 'pg', connection: process.env.DBOS_DATABASE_URL}
const dataSource = new KnexDataSource('knex-ds', config);
Note that each datasource is given a name
upon construction. These names are used internally within DBOS and must be unique.
To support operation in DBOS Cloud, DBOS_DATABASE_URL
environment varialbe should be checked within configuration to connect to the primary application database.
Installing the DBOS Schema
DBOS datasources require an additional transaction_completion
table within the dbos
schema. This table is used for recordkeeping, ensuring that each transaction is run exactly once.
This table can be installed by running the initializeDBOSSchema
method of your datasource. You may do this as part of database schema migrations or at app startup. For example, here is a Knex migration file that installs the DBOS schema in Knex:
const {
KnexDataSource
} = require('@dbos-inc/knex-datasource');
exports.up = async function(knex) {
await KnexDataSource.initializeDBOSSchema(knex);
};
exports.down = async function(knex) {
await KnexDataSource.uninitializeDBOSSchema(knex);
};
Running Transactions
DBOS datasource transactions may be run using one of the following mechanisms:
- Directly inline in the code path, using
runTransaction
- Using a function wrapper returned from
registerTransaction
- Using a
@transaction
decorator on a class method
While datasource transactions are generally run inside workflows, this is not strictly necessary. Outside of a workflow, the transaction function will still execute, but execution guarantees are not provided.
dataSource.runTransaction()
runTransaction
allows code to be run "in line" within a datasource transaction, without pulling the code out into a separate named function.
runTransaction<T>(
func: () => Promise<T>,
config?: TransactionConfig & {name?: string}
)
Parameters:
- func: The function to run as a transaction.
- config: The transaction config, described below.
Knex Example:
async function workflowFunction() {
await dataSource.runTransaction(async () => {
await dataSource.client.raw('INSERT INTO example_table (name) VALUES (?)', ['dbos']);
}, {name: "insertRow"});
return await dataSource.runTransaction(async () => {
const result = await dataSource.client.raw('SELECT COUNT(*) as count FROM example_table');
const count = result.rows[0].count;
return count;
},
{name: "countRows", readOnly: true}
);
}
const workflow = DBOS.registerWorkflow(workflowFunction, "workflow");
dataSource.registerTransaction()
A transaction function may be registered, returning a wrapper function that remembers the associated datasource and transaction configuration. The returned function takes the same argument as the provided function, making it transparent to the caller.
registerTransaction<This, Args extends unknown[], Return>(
func: (this: This, ...args: Args) => Promise<Return>,
config?: TransactionConfig & {name?: string},
): (this: This, ...args: Args) => Promise<Return>
Wrap a function in a tranasction. Returns the wrapped function.
Parameters:
- func: The function to be wrapped in a transaction.
- config: The transaction config, documented below. The exact configuration type may vary depending on the datasource.
Knex Example:
async function insertRowFunction() {
await dataSource.client.raw('INSERT INTO example_table (name) VALUES (?)', ['dbos']);
}
const insertRowTransaction = dataSource.registerTransaction(insertRowFunction);
async function countRowsFunction() {
const result = await dataSource.client.raw('SELECT COUNT(*) as count FROM example_table');
const count = result.rows[0].count;
}
const countRowsTransaction = dataSource.registerTransaction(countRowsFunction);
async function workflowFunction() {
await insertRowTransaction();
await countRowsTransaction();
}
const workflow = DBOS.registerWorkflow(workflowFunction, "workflow")
dataSource.transaction() Decorators
Decorators can be used on class methods to mark them as transactional steps within a workflow.
Each datasource provides a transaction
decorator that accepts a TransactionConfig
appropriate to the datasource.
dataSource.transaction(
config?: TransactionConfig
)
For example, the Knex TransactionConfig
is:
interface TransactionConfig {
isolationLevel?: Knex.IsolationLevels;
readOnly?: boolean;
}
Parameters:
- config:
- isolationLevel: The Postgres isolation level of the transaction. Must be one of
read committed
,repeatable read
, orserializable
. Default isserializable
. - readOnly: Whether this transaction only performs reads. Optimizes checkpointing if so.
- isolationLevel: The Postgres isolation level of the transaction. Must be one of
Example:
@dataSource.transaction()
static async insertRow() {
await dataSource.client.raw('INSERT INTO example_table (name) VALUES (?)', ['dbos']);
}
@dataSource.transaction()
static async countRows() {
const result = await dataSource.client.raw('SELECT COUNT(*) as count FROM example_table');
const count = result.rows[0].count;
}
@DBOS.workflow()
static async transactionWorkflow() {
await Toolbox.insertRow()
await Toolbox.countRows()
}
Transaction Configuration
Transaction configuration varies depending on the underlying datasource package in use. Generally, fields similar to the following are supported:
name
: Provides a name for the function, which will be recorded in the DBOS step record. If not specified, thename
will be taken from the transactionsfunction
object.isolationLevel?: 'read uncommitted' | 'read committed' | 'repeatable read' | 'serializable'
: Allows the transaction isolation level to be set.accessMode?: 'read only' | 'read write'
orreadOnly?: boolean
: Allows a read-only transaction to be requested. Read-only transactions involve no writes to the underlying application database, but the result will be stored in the DBOS system database to allow for correct workflow behavior.