pgqrs supports multiple databases including PostgreSQL and SQLite.
An application uses pgqrs with a specific database that is chosen as a feature.
[dependencies]
# PostgreSQL only (default)
pgqrs = "0.15.2"
# SQLite only
pgqrs = { version = "0.15.1", default-features = false, features = ["sqlite"] }
The choice of database determines:
- dialect
- client library
- specific functions such as handling timestamps.
The important detail is that these differences are not chosen per method call. Once the backend is selected, the dialect and timestamp handling are fixed.
This post explains why that kind of fixed-per-implementation difference maps well to Rust associated types.
The example uses one small part of pgqrs: WorkerTable.
Compile-Time Backend Variation
WorkerTable has methods like:
resumesuspendshutdownheartbeat
The logic is mostly shared across backends. The SQL is slightly different.
For example, resume means:
- transition a worker from
suspendedtoready - fail if no row was updated
PostgreSQL:
UPDATE pgqrs_workers
SET status = 'ready'
WHERE id = $1 AND status = 'suspended'
SQLite:
UPDATE pgqrs_workers
SET status = 'ready'
WHERE id = ? AND status = 'suspended'
Once the backend implementation is chosen, the dialect used inside that implementation is fixed.
The design problem is:
- keep the public
WorkerTableAPI the same - share most of the implementation
- allow each backend to provide a small set of fixed details
Java: Interface, Abstract Class, Implementation
In Java, I would solve this with:
- an interface for the public contract
- an abstract class for shared behavior
- concrete classes for backend-specific details
interface WorkerTable {
WorkerStatus getStatus(long workerId);
void heartbeat(long workerId);
void resume(long workerId);
void suspend(long workerId);
void shutdown(long workerId);
}
abstract class AbstractWorkerTable implements WorkerTable {
protected abstract String heartbeatSql();
protected abstract String resumeSql();
protected abstract WorkerStatus queryWorkerStatus(long workerId);
protected abstract long executeUpdate(String sql, Object... params);
@Override
public void heartbeat(long workerId) {
long count = executeUpdate(heartbeatSql(), Instant.now(), workerId);
if (count == 0) {
throw new WorkerNotFound(workerId);
}
}
@Override
public void resume(long workerId) {
long count = executeUpdate(resumeSql(), workerId);
if (count == 0) {
WorkerStatus status = queryWorkerStatus(workerId);
throw new InvalidStateTransition(status, "ready");
}
}
}
This is similar to Rust trait objects, dyn Trait, which uses a vtable for runtime polymorphism.
One Impl Per Backend Duplicates Code
The first attempt in Rust replicated the Java pattern in the previous section:
- define a
WorkerTabletrait - implement it separately for each backend
#[async_trait]
pub trait WorkerTable: Send + Sync {
async fn get_status(&self, id: i64) -> crate::error::Result<WorkerStatus>;
async fn heartbeat(&self, id: i64) -> crate::error::Result<()>;
async fn resume(&self, id: i64) -> crate::error::Result<()>;
async fn suspend(&self, id: i64) -> crate::error::Result<()>;
async fn shutdown(&self, id: i64) -> crate::error::Result<()>;
}
Then each backend implements the same execution pattern:
#[async_trait]
impl WorkerTable for SqliteWorkerTable {
async fn heartbeat(&self, worker_id: i64) -> Result<()> {
let now = Utc::now();
let now_str = format_sqlite_timestamp(&now);
let count = sqlx::query(
"UPDATE pgqrs_workers SET heartbeat_at = $1 WHERE id = $2",
)
.bind(now_str)
.bind(worker_id)
.execute(&self.pool)
.await?
.rows_affected();
if count == 0 {
return Err(crate::error::Error::WorkerNotFound { id: worker_id });
}
Ok(())
}
}
#[async_trait]
impl WorkerTable for PostgresWorkerTable {
async fn heartbeat(&self, worker_id: i64) -> Result<()> {
let now = Utc::now();
let count = sqlx::query(
"UPDATE pgqrs_workers SET heartbeat_at = $1 WHERE id = $2",
)
.bind(now)
.bind(worker_id)
.execute(&self.pool)
.await?
.rows_affected();
if count == 0 {
return Err(crate::error::Error::WorkerNotFound { id: worker_id });
}
Ok(())
}
}
The duplicated part is:
- build a query
- bind parameters
- execute the update
- check row count
- map the failure to a domain error
The backend-specific part is much smaller:
- SQL placeholders such as
$1versus? - execution details
- timestamp formatting
- row decoding
Compile-Time and Run-Time Variation
At this point there are two design choices.
The first choice is runtime variation. Store an object like &dyn SqlStatements and ask it for SQL strings at runtime:
- concrete
WorkerTablestruct - a
&dyn SqlStatementsinside it - methods like
resume_sql()andheartbeat_sql()
The second choice is compile-time variation. Make the dialect a type chosen by each backend implementation:
- public
WorkerTabletrait - internal helper trait with default methods
type Dialectselected by each implementation
The second option is a better fit here because SQL dialect is not a runtime choice. SQLite code will never suddenly use PostgreSQL SQL at runtime. The dialect is fixed by the backend implementation.
Associated Types for Compile-Time Variation
An associated type is a placeholder type inside a trait.
The trait says: every implementation must choose this type.
The implementation says: for me, the type is SqliteDialect or PostgresDialect.
After that choice is made, the compiler treats it as a concrete type.
In this example, DialectWorkerTable says:
- every worker table implementation must choose one
Dialect - shared code can refer to that choice as
Self::Dialect
Then:
SqliteWorkerTablechoosesSqliteDialectPostgresWorkerTablechoosesPostgresDialect
The compile-time design has three parts:
- a public trait for the contract
- an internal trait with default methods for shared behavior
- an associated type for the dialect
#[async_trait]
pub(crate) trait DialectWorkerTable: crate::store::WorkerTable + Sync {
type Dialect: SqlDialect; // compile-time choice
async fn execute_worker_update(&self, query: QueryBuilder) -> Result<u64>;
async fn query_worker_status(&self, worker_id: i64) -> Result<WorkerStatus>;
fn format_now(&self, now: DateTime<Utc>) -> String;
async fn dialect_heartbeat(&self, worker_id: i64) -> Result<()> {
// shared implementation
let now = Utc::now();
let now_str = self.format_now(now);
let count = self
.execute_worker_update(
QueryBuilder::new(Self::Dialect::WORKER.heartbeat)
.bind_string(now_str)
.bind_i64(worker_id),
)
.await?;
if count == 0 {
return Err(crate::error::Error::WorkerNotFound { id: worker_id });
}
Ok(())
}
async fn dialect_resume(&self, worker_id: i64) -> Result<()> {
// shared implementation
let count = self
.execute_worker_update(
QueryBuilder::new(Self::Dialect::WORKER.resume).bind_i64(worker_id),
)
.await?;
if count == 0 {
let current_status = self.query_worker_status(worker_id).await?;
return Err(crate::error::Error::InvalidStateTransition {
from: current_status.to_string(),
to: "ready".to_string(),
reason: "Worker must be in Suspended state to resume".to_string(),
});
}
Ok(())
}
}
Two Rust features to focus on are:
type Dialectis an associated type. Each implementation chooses a concrete dialect.dialect_heartbeatanddialect_resumeare default methods. The shared logic lives once in the trait.
The dialect itself is just data:
pub(crate) struct WorkerSql {
pub heartbeat: &'static str,
pub resume: &'static str,
pub suspend: &'static str,
pub shutdown: &'static str,
}
Each dialect provides backend-specific SQL. Then each worker table chooses a dialect by filling in the associated type:
impl SqlDialect for SqliteDialect {
const WORKER: WorkerSql = WorkerSql {
heartbeat: r#"
UPDATE pgqrs_workers SET heartbeat_at = $1 WHERE id = $2
"#,
resume: r#"
UPDATE pgqrs_workers
SET status = 'ready'
WHERE id = $1 AND status = 'suspended'
"#,
suspend: r#"
UPDATE pgqrs_workers
SET status = 'suspended'
WHERE id = $1 AND status IN ('ready', 'polling', 'interrupted')
"#,
shutdown: r#"
UPDATE pgqrs_workers
SET status = 'stopped', shutdown_at = $2
WHERE id = $1 AND status = 'suspended'
"#,
};
}
#[async_trait]
impl DialectWorkerTable for SqliteWorkerTable {
type Dialect = SqliteDialect;
fn format_now(&self, now: DateTime<Utc>) -> String {
format_sqlite_timestamp(&now)
}
}
The important properties of associated types are:
- the trait defines a placeholder for a type
- each implementation supplies one concrete type
Read type Dialect = SqliteDialect as:
For SqliteWorkerTable, whenever the shared trait code says Self::Dialect,
use SqliteDialect.
Associated Types Versus dyn
dyn Trait models runtime polymorphism. Associated types model compile-time structure chosen by each implementation.
&dyn WorkerTable means:
- a concrete implementing type exists
- the caller does not know which one
- method dispatch happens through a vtable
| Situation | Better fit |
|---|---|
| SQL dialect fixed by the backend implementation | Associated type |
| Shared logic plus backend-specific hooks | Trait with default methods + associated type |
| Implementation must remain open at runtime | dyn Trait |
Applied to this example:
SqliteDialectandPostgresDialectare compile-time choices, so they should be types.WorkerTablecan still be exposed behinddynat an API boundary if implementation must remain open at runtime.
Summary
As a Java developer:
- Java interface maps to a public Rust trait.
- Java abstract class maps to an internal Rust trait with default methods.
- Backend-specific differences that are fixed by the implementation should be modeled as types.
The rule I use now:
- use
dyn Traitwhen the implementation must vary at runtime - use an associated type when each implementation has to choose a fixed helper type
Java does not have a direct equivalent of Rust associated types. Therefore associated types were not the first tool that I
thought of when implementing support for multiple databases. For the pgqrs worker table, SQL
dialect is fixed once the backend implementation is chosen. That makes associated types the right
abstraction.