pgqrs supports multiple databases including PostgreSQL and SQLite.
An application uses pgqrs with a specific database that is chosen as a feature.
[dependencies]
# PostgreSQL only (default)
pgqrs = "0.15.2"
# SQLite only
pgqrs = { version = "0.15.1", default-features = false, features = ["sqlite"] }
The choice of database determines:
- dialect
- client library
- specific functions such as handling timestamps.
Since these differences are fixed by the selected backend, associated types are a better fit than dyn, which is meant for runtime variation.
This post explains that choice using one small part of pgqrs: WorkerTable.
# Compile-Time Backend Variation
WorkerTable has methods like:
resumesuspendshutdownheartbeat
The logic is mostly shared across backends. The SQL is slightly different.
For example, resume means:
- transition a worker from
suspendedtoready - fail if no row was updated
PostgreSQL:
UPDATE pgqrs_workers
SET status = 'ready'
WHERE id = $1 AND status = 'suspended'
SQLite:
UPDATE pgqrs_workers
SET status = 'ready'
WHERE id = ? AND status = 'suspended'
Once the backend implementation is chosen, the dialect used inside that implementation is fixed.
Java: Interface, Abstract Class, Implementation
In Java, I would solve this with:
- an interface for the public contract
- an abstract class for shared behavior
- concrete classes for backend-specific details
interface WorkerTable {
WorkerStatus getStatus(long workerId);
void heartbeat(long workerId);
void resume(long workerId);
void suspend(long workerId);
void shutdown(long workerId);
}
abstract class AbstractWorkerTable implements WorkerTable {
protected abstract String heartbeatSql();
protected abstract String resumeSql();
protected abstract WorkerStatus queryWorkerStatus(long workerId);
protected abstract long executeUpdate(String sql, Object... params);
@Override
public void heartbeat(long workerId) {
long count = executeUpdate(heartbeatSql(), Instant.now(), workerId);
if (count == 0) {
throw new WorkerNotFound(workerId);
}
}
@Override
public void resume(long workerId) {
long count = executeUpdate(resumeSql(), workerId);
if (count == 0) {
WorkerStatus status = queryWorkerStatus(workerId);
throw new InvalidStateTransition(status, "ready");
}
}
}
This is similar to Rust trait objects, dyn Trait, which uses a vtable for runtime polymorphism.
One Impl Per Backend Duplicates Code
The first attempt in Rust replicated the Java pattern in the previous section:
- define a
WorkerTabletrait - implement it separately for each backend
#[async_trait]
pub trait WorkerTable: Send + Sync {
async fn get_status(&self, id: i64) -> crate::error::Result<WorkerStatus>;
async fn heartbeat(&self, id: i64) -> crate::error::Result<()>;
async fn resume(&self, id: i64) -> crate::error::Result<()>;
async fn suspend(&self, id: i64) -> crate::error::Result<()>;
async fn shutdown(&self, id: i64) -> crate::error::Result<()>;
}
Then each backend implements the same execution pattern:
#[async_trait]
impl WorkerTable for SqliteWorkerTable {
async fn heartbeat(&self, worker_id: i64) -> Result<()> {
let now = Utc::now();
let now_str = format_sqlite_timestamp(&now);
let count = sqlx::query(
"UPDATE pgqrs_workers SET heartbeat_at = $1 WHERE id = $2",
)
.bind(now_str)
.bind(worker_id)
.execute(&self.pool)
.await?
.rows_affected();
if count == 0 {
return Err(crate::error::Error::WorkerNotFound { id: worker_id });
}
Ok(())
}
}
#[async_trait]
impl WorkerTable for PostgresWorkerTable {
async fn heartbeat(&self, worker_id: i64) -> Result<()> {
let now = Utc::now();
let count = sqlx::query(
"UPDATE pgqrs_workers SET heartbeat_at = $1 WHERE id = $2",
)
.bind(now)
.bind(worker_id)
.execute(&self.pool)
.await?
.rows_affected();
if count == 0 {
return Err(crate::error::Error::WorkerNotFound { id: worker_id });
}
Ok(())
}
}
The duplicated part is:
- build a query
- bind parameters
- execute the update
- check row count
- map the failure to a domain error
The backend-specific part is much smaller:
- SQL placeholders such as
$1versus? - execution details
- timestamp formatting
- row decoding
Compile-Time and Run-Time Variation
The implementation in the previous section can be made more DRY in two ways. The first option pushes runtime dispatch down into a helper object:
- concrete
WorkerTablestruct - a
&dyn SqlStatementsinside it - methods like
resume_sql()andheartbeat_sql()
The second option is a concrete dialect type at compile time:
- public
WorkerTabletrait - internal helper trait with default methods
type Dialectselected by each implementation
The second option is a better fit here because SQL dialect is not a runtime choice. It is fixed by the backend implementation.
Associated Types for Compile-Time Variation
The compile-time design has three parts:
- a public trait for the contract
- an internal trait with default methods for shared behavior
- an associated type for the dialect
#[async_trait]
pub(crate) trait DialectWorkerTable: crate::store::WorkerTable + Sync {
type Dialect: SqlDialect; // compile-time choice
async fn execute_worker_update(&self, query: QueryBuilder) -> Result<u64>;
async fn query_worker_status(&self, worker_id: i64) -> Result<WorkerStatus>;
fn format_now(&self, now: DateTime<Utc>) -> String;
async fn dialect_heartbeat(&self, worker_id: i64) -> Result<()> {
// shared implementation
let now = Utc::now();
let now_str = self.format_now(now);
let count = self
.execute_worker_update(
QueryBuilder::new(Self::Dialect::WORKER.heartbeat)
.bind_string(now_str)
.bind_i64(worker_id),
)
.await?;
if count == 0 {
return Err(crate::error::Error::WorkerNotFound { id: worker_id });
}
Ok(())
}
async fn dialect_resume(&self, worker_id: i64) -> Result<()> {
// shared implementation
let count = self
.execute_worker_update(
QueryBuilder::new(Self::Dialect::WORKER.resume).bind_i64(worker_id),
)
.await?;
if count == 0 {
let current_status = self.query_worker_status(worker_id).await?;
return Err(crate::error::Error::InvalidStateTransition {
from: current_status.to_string(),
to: "ready".to_string(),
reason: "Worker must be in Suspended state to resume".to_string(),
});
}
Ok(())
}
}
Two Rust features to focus one are:
type Dialectis an associated type. Each implementation chooses a concrete dialect.dialect_heartbeatanddialect_resumeare default methods. The shared logic lives once in the trait.
The dialect itself is just data:
pub(crate) struct WorkerSql {
pub heartbeat: &'static str,
pub resume: &'static str,
pub suspend: &'static str,
pub shutdown: &'static str,
}
A backend binds the abstract trait to a concrete dialect:
impl SqlDialect for SqliteDialect {
const WORKER: WorkerSql = WorkerSql {
heartbeat: r#"
UPDATE pgqrs_workers SET heartbeat_at = $1 WHERE id = $2
"#,
resume: r#"
UPDATE pgqrs_workers
SET status = 'ready'
WHERE id = $1 AND status = 'suspended'
"#,
suspend: r#"
UPDATE pgqrs_workers
SET status = 'suspended'
WHERE id = $1 AND status IN ('ready', 'polling', 'interrupted')
"#,
shutdown: r#"
UPDATE pgqrs_workers
SET status = 'stopped', shutdown_at = $2
WHERE id = $1 AND status = 'suspended'
"#,
};
}
#[async_trait]
impl DialectWorkerTable for SqliteWorkerTable {
type Dialect = SqliteDialect;
fn format_now(&self, now: DateTime<Utc>) -> String {
format_sqlite_timestamp(&now)
}
}
The important properties of associated types are:
- the trait defines a placeholder for a type
- each implementation supplies one concrete type
Associated Types Versus dyn
The practical differences are:
dyn Trait models runtime polymorphism. Associated types model compile-time structure chosen by each implementation.
&dyn WorkerTable means:
- a concrete implementing type exists
- the caller does not know which one
- method dispatch happens through a vtable
| Situation | Better fit |
|---|---|
| SQL dialect fixed by the backend implementation | Associated type |
| Shared logic plus backend-specific hooks | Trait with default methods + associated type |
| Implementation must remain open at runtime | dyn Trait |
Applied to this example:
SqliteDialectandPostgresDialectare compile-time choices, so they should be types.WorkerTablecan still be exposed behinddynat an API boundary if implementation must remain open at runtime.
Summary
As an experienced Java developer:
- Java interface maps to a public Rust trait.
- Java abstract class maps to an internal Rust trait with default methods.
- Backend-specific differences that are fixed by the implementation should be modeled as types.
Java does not have a direct equivalent of Rust associated types. Therefore associated types were not the first tool that I
thought of when implementing support for multiple databases. For the pgqrs worker table, SQL
dialect is fixed once the backend implementation is chosen. That makes associated types the right
abstraction.