Skip to main content

Quick Start

import { makeTransactionStream, HttpTransport } from 'voltaire-effect/services'
import { Effect, Stream } from 'effect'

const program = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    txStream.watchConfirmed({ confirmations: 3 }),
    (event) => Effect.log(`Confirmed: ${event.transaction.hash}`)
  )
}).pipe(
  Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)

Watch Pending Transactions

Monitor the mempool for pending transactions:
const program = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    txStream.watchPending({ filter: { to: usdcAddress } }),
    (event) => Effect.log(`Pending: ${event.transaction.hash}`)
  )
})

Watch Confirmed Transactions

Watch for transactions that reach a confirmation threshold:
const program = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    txStream.watchConfirmed({ 
      filter: { from: myAddress }, 
      confirmations: 3 
    }),
    (event) => Effect.log(`Confirmed in block ${event.transaction.blockNumber}`)
  )
})

Track a Specific Transaction

Track a transaction through its full lifecycle (pending → confirmed/dropped):
const program = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    txStream.track(txHash, { confirmations: 3 }),
    (event) => {
      if (event.type === 'confirmed') {
        return Effect.log(`Confirmed in block ${event.transaction.blockNumber}`)
      }
      if (event.type === 'dropped') {
        return Effect.log(`Dropped: ${event.reason}`)
      }
      return Effect.log(`Pending...`)
    }
  )
})

Configuration Options

interface WatchPendingOptions {
  filter?: TransactionFilter     // Filter by from/to address
}

interface WatchConfirmedOptions {
  filter?: TransactionFilter     // Filter by from/to address
  confirmations?: number         // Required confirmations (default: 1)
}

interface TrackOptions {
  confirmations?: number         // Required confirmations (default: 1)
}

Request Configuration

Use Effect-native helpers for timeout and retry:
import { withTimeout, withRetrySchedule } from 'voltaire-effect'
import { Schedule } from 'effect'

// Track with 30 second timeout
const tracked = txStream.track(txHash, { confirmations: 3 }).pipe(
  Stream.timeout("30 seconds")
)

// Custom retry schedule for pending watch
const pendingWithRetry = txStream
  .watchPending({ filter: { to: usdcAddress } })
  .pipe(
    Stream.retry(
      Schedule.exponential("1 second").pipe(
        Schedule.jittered,
        Schedule.compose(Schedule.recurs(3))
      )
    )
  )

Error Handling

import { TransactionStreamError } from 'voltaire-effect/services'

program.pipe(
  Effect.catchTag("TransactionStreamError", (e) =>
    Effect.log(`Stream error: ${e.message}`)
  )
)

Stream Shape

import { makeTransactionStream, type TransactionStreamShape } from 'voltaire-effect/services'

const txStream: TransactionStreamShape = yield* makeTransactionStream()

Dependencies

makeTransactionStream ─── requires ──→ TransportService
// Use with program
Effect.runPromise(program.pipe(
  Effect.provide(HttpTransport('https://eth.llamarpc.com'))
))