Skip to main content

Quick Start

import { makeBlockStream, HttpTransport } from 'voltaire-effect/services'
import { Effect, Stream } from 'effect'

const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  
  yield* Stream.runForEach(
    blockStream.watch({ include: 'transactions' }),
    (event) => {
      if (event.type === 'reorg') {
        return Effect.log(`Reorg: ${event.removed.length} blocks removed`)
      }
      return Effect.log(`Block ${event.blocks[0]?.header.number}`)
    }
  )
}).pipe(
  Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)

Backfill Historical Blocks

Fetch blocks in a range with dynamic chunking:
const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  
  yield* Stream.runForEach(
    blockStream.backfill({
      fromBlock: 18000000n,
      toBlock: 18000100n,
      include: 'receipts'  // 'header' | 'transactions' | 'receipts'
    }),
    (event) => Effect.log(`Got ${event.blocks.length} blocks`)
  )
})
Include levels:
  • header - Block header only (minimal data)
  • transactions - Header + full transaction objects
  • receipts - Header + transactions + transaction receipts

Watch New Blocks

Poll for new blocks with reorg detection:
const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  
  yield* Stream.runForEach(
    blockStream.watch({ include: 'transactions' }),
    (event) => {
      if (event.type === 'reorg') {
        // Handle chain reorganization
        for (const removed of event.removed) {
          console.log('Rollback block:', removed.number)
        }
        for (const added of event.added) {
          console.log('Apply new block:', added.header.number)
        }
        return Effect.void
      }
      
      // Normal new blocks
      for (const block of event.blocks) {
        console.log('New block:', block.header.number)
      }
      return Effect.void
    }
  )
})

Reorg Handling

BlockStream detects chain reorganizations automatically:
type BlockStreamEvent<T> = BlocksEvent<T> | ReorgEvent<T>

interface ReorgEvent<T> {
  type: 'reorg'
  removed: LightBlock[]       // Blocks removed (newest first)
  added: StreamBlock<T>[]     // Blocks added (oldest first)
  commonAncestor: LightBlock  // Last block before divergence
  metadata: { chainHead: bigint }
}

interface BlocksEvent<T> {
  type: 'blocks'
  blocks: StreamBlock<T>[]
  metadata: { chainHead: bigint }
}
Reorg handling pattern:
  1. Undo state for removed blocks (newest to oldest)
  2. Apply state for added blocks (oldest to newest)

Cancellation

Use Effect Scope for clean cancellation:
import { Scope } from 'effect'

const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  
  // Take first 10 blocks then stop
  yield* Stream.take(10)(
    blockStream.watch()
  ).pipe(Stream.runCollect)
})

// Or with manual scope
const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  const scope = yield* Scope.make()
  
  // Stream will stop when scope closes
  yield* Effect.addFinalizer(scope, () => Effect.log('Stopped watching'))
})

Configuration Options

interface WatchOptions<T extends BlockInclude> {
  include?: T                    // Content level (default: 'header')
  fromBlock?: bigint             // Start from block (default: current)
}

interface BackfillOptions<T extends BlockInclude> {
  fromBlock: bigint              // Start block (required)
  toBlock: bigint                // End block (required)
  include?: T                    // Content level (default: 'header')
}

Request Configuration

Use Effect-native helpers for per-request timeout and retry:
import { withTimeout, withRetrySchedule } from 'voltaire-effect'
import { Schedule } from 'effect'

// Custom timeout for slow RPCs
const program = blockStream.watch().pipe(
  Stream.mapEffect((event) =>
    processEvent(event).pipe(withTimeout("30 seconds"))
  )
)

// Custom retry with exponential backoff
const backfillWithRetry = blockStream
  .backfill({ fromBlock: 18000000n, toBlock: 18000100n })
  .pipe(
    Stream.retry(
      Schedule.exponential("500 millis").pipe(
        Schedule.jittered,
        Schedule.compose(Schedule.recurs(5))
      )
    )
  )

Error Handling

import { BlockStreamError } from 'voltaire-effect/services'

program.pipe(
  Effect.catchTag("BlockStreamError", (e) =>
    Effect.log(`Stream error: ${e.message}`)
  )
)

Stream Shape

import { makeBlockStream, type BlockStreamShape } from 'voltaire-effect/services'

const blockStream: BlockStreamShape = yield* makeBlockStream()

Dependencies

makeBlockStream ─── requires ──→ TransportService
// Use with program
Effect.runPromise(program.pipe(
  Effect.provide(HttpTransport('https://eth.llamarpc.com'))
))