Quick Start
Copy
Ask AI
import { makeTransactionStream, HttpTransport } from 'voltaire-effect/services'
import { Effect, Stream } from 'effect'
const program = Effect.gen(function* () {
const txStream = yield* makeTransactionStream()
yield* Stream.runForEach(
txStream.watchConfirmed({ confirmations: 3 }),
(event) => Effect.log(`Confirmed: ${event.transaction.hash}`)
)
}).pipe(
Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)
Watch Pending Transactions
Monitor the mempool for pending transactions:Copy
Ask AI
const program = Effect.gen(function* () {
const txStream = yield* makeTransactionStream()
yield* Stream.runForEach(
txStream.watchPending({ filter: { to: usdcAddress } }),
(event) => Effect.log(`Pending: ${event.transaction.hash}`)
)
})
Watch Confirmed Transactions
Watch for transactions that reach a confirmation threshold:Copy
Ask AI
const program = Effect.gen(function* () {
const txStream = yield* makeTransactionStream()
yield* Stream.runForEach(
txStream.watchConfirmed({
filter: { from: myAddress },
confirmations: 3
}),
(event) => Effect.log(`Confirmed in block ${event.transaction.blockNumber}`)
)
})
Track a Specific Transaction
Track a transaction through its full lifecycle (pending → confirmed/dropped):Copy
Ask AI
const program = Effect.gen(function* () {
const txStream = yield* makeTransactionStream()
yield* Stream.runForEach(
txStream.track(txHash, { confirmations: 3 }),
(event) => {
if (event.type === 'confirmed') {
return Effect.log(`Confirmed in block ${event.transaction.blockNumber}`)
}
if (event.type === 'dropped') {
return Effect.log(`Dropped: ${event.reason}`)
}
return Effect.log(`Pending...`)
}
)
})
Configuration Options
Copy
Ask AI
interface WatchPendingOptions {
filter?: TransactionFilter // Filter by from/to address
}
interface WatchConfirmedOptions {
filter?: TransactionFilter // Filter by from/to address
confirmations?: number // Required confirmations (default: 1)
}
interface TrackOptions {
confirmations?: number // Required confirmations (default: 1)
}
Request Configuration
Use Effect-native helpers for timeout and retry:Copy
Ask AI
import { withTimeout, withRetrySchedule } from 'voltaire-effect'
import { Schedule } from 'effect'
// Track with 30 second timeout
const tracked = txStream.track(txHash, { confirmations: 3 }).pipe(
Stream.timeout("30 seconds")
)
// Custom retry schedule for pending watch
const pendingWithRetry = txStream
.watchPending({ filter: { to: usdcAddress } })
.pipe(
Stream.retry(
Schedule.exponential("1 second").pipe(
Schedule.jittered,
Schedule.compose(Schedule.recurs(3))
)
)
)
Error Handling
Copy
Ask AI
import { TransactionStreamError } from 'voltaire-effect/services'
program.pipe(
Effect.catchTag("TransactionStreamError", (e) =>
Effect.log(`Stream error: ${e.message}`)
)
)
Stream Shape
Copy
Ask AI
import { makeTransactionStream, type TransactionStreamShape } from 'voltaire-effect/services'
const txStream: TransactionStreamShape = yield* makeTransactionStream()
Dependencies
Copy
Ask AI
makeTransactionStream ─── requires ──→ TransportService
Copy
Ask AI
// Use with program
Effect.runPromise(program.pipe(
Effect.provide(HttpTransport('https://eth.llamarpc.com'))
))

