PyTest and JUnit provide test fixtures to setup and teardown resources. For example, if a service uses a Postgres database, a fixture can start a Postgres container and provide the connection string to the test.
import pytest
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="session")
def postgres():
with PostgresContainer("postgres:16") as pg:
yield pg.get_connection_url()
def test_enqueue_message(postgres):
conn = connect(postgres)
# test logic...
def test_dequeue_message(postgres):
conn = connect(postgres)
# test logic...
The scope="session" parameter ensures the container starts once before any test runs and stops after all tests complete. The fixture is specified as a parameter to the test function. In the above example, postgres fixture is specified as a parameter.
JUnit provides similar functionality with @Testcontainers and @Container:
@Testcontainers
class QueueTests {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16");
@Test
void testEnqueueMessage() {
Connection conn = DriverManager.getConnection(postgres.getJdbcUrl());
// test logic...
}
@Test
void testDequeueMessage() {
Connection conn = DriverManager.getConnection(postgres.getJdbcUrl());
// test logic...
}
}
The annotations are syntactic sugar that hide the @BeforeAll and @AfterAll boilerplate. The control flow is:
- Finds all static fields annotated with
@Container - Calls
.start()before any tests run (via@BeforeAll) - Calls
.stop()after all tests complete (via@AfterAll) - For non-static
@Containerfields, it usesbeforeEach/afterEachinstead for per-test container lifecycle.
Prior Art in Rust Link to heading
Rust’s test framework is intentionally minimal since the beginning. The standard workaround is to call setup() and teardown() functions manually in each test.
rstest provides pytest-like fixtures with #[fixture] and #[rstest] attributes:
use rstest::{fixture, rstest};
#[fixture]
fn repository() -> impl Repository {
InMemoryRepository::new()
}
#[rstest]
fn should_find_user(repository: impl Repository) {
// repository is injected, created fresh for this test
}
However, rstest fixtures are function-scoped only. Each test gets a fresh instance. There is no scope="session" equivalent - you cannot share a single database container across all tests.
For session-scoped fixtures - starting a container once and sharing it across tests - I needed to build something custom.
A Session-Scoped Fixture Trait in Rust Link to heading
cargo test works as follows:
- Compilation: Each test file (e.g.,
tests/queue_tests.rs) is compiled into a separate test binary. - Execution: The binary runs all
#[test]functions within it run in parallel on separate threads. - Isolation: Each test function runs independently with no shared state unless explicitly created.
Any resources in an integration test, like a database container, have to be created in every test:
#[tokio::test]
async fn test_enqueue_message() {
let container = PostgresContainer::new().await; // Slow!
let store = connect(&container.dsn).await;
// test logic...
} // Container stops when dropped
#[tokio::test]
async fn test_dequeue_message() {
let container = PostgresContainer::new().await; // Another slow start!
let store = connect(&container.dsn).await;
// test logic...
}
The ideal setup is to have a single container started before all tests and stopped after all tests. Resources can be shared:
- Global: Across all tests in all test binaries.
- Session: Across all tests in one test binary.
TestResource: A Session-Scoped Fixture Trait Link to heading
TestResource trait is an approximation of session-scoped pytest fixtures. It provides shared resources across tests in a single file, but with these caveats:
- No declarative syntax (no
#[fixture]attribute) - Manual session state management
- Cleanup requires external crate magic
use async_trait::async_trait;
#[async_trait]
pub trait TestResource: Send + Sync {
async fn initialize(&self) -> Result<(), Box<dyn std::error::Error>>;
async fn get_dsn(&self, schema: Option<&str>) -> String;
async fn cleanup(&self) -> Result<(), Box<dyn std::error::Error>>;
}
RwLock for Exclusive Initialization and Concurrent Reuse Link to heading
TestResource should be initialized once on first access and then reused across all tests. Tests within
a binary run in parallel. Therefore a Read-Write Lock is required to ensure that there is only one writer
or one thread that creates the resource. All other threads are either queued or start up after the resource
is created.
RwLock lock that allows
only one writer at the start of a test to create the resource and then allows multiple readers for the
rest of lifetime of the test.
Lazy is a thread-safe lazy initialization
wrapper that ensures the static is initialized on first access, not at program start. This ensures that
all the context for the resource is available.
use once_cell::sync::Lazy;
use std::sync::RwLock;
pub struct ResourceManager {
pub resource: Box<dyn TestResource>,
}
pub static RESOURCE_MANAGER: Lazy<RwLock<Option<ResourceManager>>> =
Lazy::new(|| RwLock::new(None));
Access to the resource manager uses a double-check pattern to optimize for the read phase:
async fn initialize_global_resource(backend: BackendType) {
// Fast path: check with read lock
{
let guard = RESOURCE_MANAGER.read().unwrap();
if guard.is_some() {
return; // Already initialized
}
}
// Slow path: initialize with write lock
let mut guard = RESOURCE_MANAGER.write().unwrap();
if guard.is_some() {
return; // Another thread initialized while we waited
}
let resource: Box<dyn TestResource> = match backend {
BackendType::Postgres => {
let r = PostgresResource::new();
r.initialize().await.expect("Failed to init postgres");
Box::new(r)
}
// ... other backends
};
*guard = Some(ResourceManager::new(resource));
}
For PostgreSQL, the resource wraps the container in another RwLock<Option<...>>:
pub struct PostgresResource {
container: RwLock<Option<PostgresContainer>>,
}
#[async_trait]
impl TestResource for PostgresResource {
async fn initialize(&self) -> Result<(), Box<dyn std::error::Error>> {
let container = PostgresContainer::new().await?;
let mut guard = self.container.write().unwrap();
*guard = Some(container);
Ok(())
}
async fn get_dsn(&self, schema: Option<&str>) -> String {
let dsn = {
let guard = self.container.read().unwrap();
guard.as_ref()
.expect("PostgresResource not initialized")
.dsn.clone()
};
dsn
}
// ...
}
dtor for Session(Process)-Level Cleanup Link to heading
The TestResource has to be cleaned up when the test binary shuts down.
The ctor crate provides #[dtor] which runs when the process exits:
use ctor::dtor;
#[dtor]
fn drop_database() {
let mut guard = match RESOURCE_MANAGER.write() {
Ok(g) => g,
Err(e) => e.into_inner(), // Handle poisoned lock
};
if let Some(manager) = guard.take() {
let rt = tokio::runtime::Runtime::new()
.expect("Failed to create cleanup runtime");
rt.block_on(async {
if let Err(e) = manager.resource.cleanup().await {
eprintln!("Error during resource cleanup: {}", e);
}
});
}
}
Notes:
#[dtor]runs when the test binary process exitsguard.take()takes ownership to ensure cleanup happens exactly once- A new Tokio runtime is created because the test runtime may already be shut down
- Poisoned locks are handled gracefully with
into_inner()
The cleanup implementation for PostgreSQL:
async fn cleanup(&self) -> Result<(), Box<dyn std::error::Error>> {
let container_opt = {
let mut guard = self.container.write().unwrap();
guard.take() // Take ownership, leaving None
};
if let Some(c) = container_opt {
println!("Stopping PostgreSQL container...");
let _ = c.container.stop().await;
println!("Stopped.");
}
Ok(())
}
Usage Link to heading
Tests access the database through helper functions:
mod common;
#[tokio::test]
async fn test_enqueue_message() {
let dsn = common::get_test_dsn("test_enqueue").await;
let store = connect(&dsn).await.expect("Failed to connect");
// test logic...
}
The helper lazily initializes the global resource:
pub async fn get_test_dsn(schema: &str) -> String {
initialize_global_resource(current_backend()).await;
let guard = RESOURCE_MANAGER.read().unwrap();
guard.as_ref().unwrap().resource.get_dsn(Some(schema)).await
}
Gaps Compared to pytest Fixtures Link to heading
TestResource is a workable approximation. These are some of the major differences:
| pytest Feature | TestResource Status |
|---|---|
scope="session" | Achieved via global static |
scope="function" | Not supported |
scope="module" | Not supported |
Declarative @pytest.fixture | Requires explicit helper functions |
| Dependency injection | Manual parameter passing |
| Fixture composition | Must be manually wired |
Teardown via yield | Requires #[dtor] crate magic |
| Per-test isolation | Via schema names, not separate resources |
A couple of other major callouts are:
- No automatic dependency resolution: pytest figures out fixture dependencies from function signatures. TestResource requires manual initialization order.
- Cleanup timing is imprecise:
#[dtor]runs at process exit, not after tests complete. If the test binary crashes, cleanup may not run.
Conclusion Link to heading
Rust’s test framework lacks fixtures. Building Session-scoped fixture semantics requires:
- Global state with
Lazy<RwLock<Option<...>>>for lazy, thread-safe singleton initialization #[dtor]from thectorcrate for process-exit cleanup
The TestResource trait provides enough structure to share expensive resources like database containers across tests.
The full implementation is available in pgqrs.