Skip to main content

Watch New Blocks

Stream new blocks in real-time with automatic reorg handling.
import { Effect, Stream } from 'effect'
import { makeBlockStream, HttpTransport } from 'voltaire-effect/services'

const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  yield* Stream.runForEach(
    blockStream.watch({ include: 'transactions' }),
    (event) => {
      if (event.type === 'reorg') {
        return Effect.log(`Reorg! Removed ${event.removed.length} blocks`)
      }
      const block = event.blocks[0]
      return Effect.log(`Block ${block?.header.number}: ${block?.transactions.length} txs`)
    }
  )
}).pipe(
  Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)

await Effect.runPromise(program)

Backfill Historical Blocks

Fetch blocks in a range with automatic chunking for large ranges.
const backfillBlocks = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  let blockCount = 0
  let txCount = 0

  yield* Stream.runForEach(
    blockStream.backfill({
      fromBlock: 18000000n,
      toBlock: 18000100n,
      include: 'transactions'
    }),
    (event) =>
      Effect.sync(() => {
        for (const block of event.blocks) {
          blockCount++
          txCount += block.transactions.length
        }
      })
  )

  yield* Effect.log(`Processed ${blockCount} blocks with ${txCount} transactions`)
})

Include Levels

Control the data fetched per block:
// Headers only (minimal data, fastest)
blockStream.backfill({ 
  fromBlock: 18000000n, 
  toBlock: 18001000n,
  include: 'header' 
})

// Headers + full transaction objects
blockStream.backfill({ 
  fromBlock: 18000000n, 
  toBlock: 18001000n,
  include: 'transactions' 
})

// Headers + transactions + receipts (most data)
blockStream.backfill({ 
  fromBlock: 18000000n, 
  toBlock: 18001000n,
  include: 'receipts' 
})

Handle Reorganizations

BlockStream detects chain reorgs and provides undo/redo information:
const handleReorgs = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  yield* Stream.runForEach(
    blockStream.watch({ include: 'transactions' }),
    (event) => {
      if (event.type === 'reorg') {
        return Effect.gen(function* () {
          // Undo state for removed blocks (newest first)
          for (const removed of event.removed) {
            yield* Effect.log(`Rollback block ${removed.number}`)
            // Your rollback logic here
          }

          // Apply state for added blocks (oldest first)
          for (const added of event.added) {
            yield* Effect.log(`Apply block ${added.header.number}`)
            // Your apply logic here
          }

          yield* Effect.log(`Common ancestor: ${event.commonAncestor.number}`)
        })
      }

      // Normal new blocks
      for (const block of event.blocks) {
        yield* Effect.log(`New block ${block.header.number}`)
      }
      return Effect.void
    }
  )
})

Stream Processing

Use Effect Stream operators for composable transformations:
const processBlocks = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  // Extract block summaries
  const summaries = blockStream.backfill({
    fromBlock: 18000000n,
    toBlock: 18000100n,
    include: 'transactions'
  }).pipe(
    Stream.flatMap((event) => Stream.fromIterable(event.blocks)),
    Stream.map((block) => ({
      number: block.header.number,
      txCount: block.transactions.length,
      gasUsed: block.header.gasUsed,
      timestamp: block.header.timestamp
    })),
    // Filter blocks with transactions
    Stream.filter((b) => b.txCount > 0),
    // Take first 50
    Stream.take(50)
  )

  const results = yield* Stream.runCollect(summaries)
  yield* Effect.log(`Found ${results.length} blocks with transactions`)
})

Progress Tracking

Track progress for large backfill operations:
const backfillWithProgress = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  const fromBlock = 18000000n
  const toBlock = 18100000n
  const totalBlocks = Number(toBlock - fromBlock)
  let processedBlocks = 0

  yield* Stream.runForEach(
    blockStream.backfill({ fromBlock, toBlock, include: 'header' }),
    (event) =>
      Effect.gen(function* () {
        processedBlocks += event.blocks.length
        const progress = (processedBlocks / totalBlocks) * 100

        if (processedBlocks % 1000 === 0 || processedBlocks === totalBlocks) {
          yield* Effect.log(`Progress: ${progress.toFixed(1)}% (${processedBlocks}/${totalBlocks})`)
        }
      })
  )

  yield* Effect.log('Backfill complete!')
})

Combine Backfill + Watch

Seamlessly transition from historical to live streaming:
const historicalToLive = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  // Backfill from a known block
  const backfill = blockStream.backfill({
    fromBlock: 18000000n,
    toBlock: 'latest',
    include: 'header'
  })

  // Watch for new blocks
  const watch = blockStream.watch({ include: 'header' })

  // Combine: backfill first, then continue watching
  const combined = Stream.concat(backfill, watch)

  yield* Stream.runForEach(
    Stream.take(combined, 200),  // Take 200 blocks total
    (event) =>
      Effect.log(`Block batch: ${event.blocks.map(b => b.header.number).join(', ')}`)
  )
})

Cancellation

Gracefully stop streaming:
import { Effect, Stream, Fiber } from 'effect'
import { makeBlockStream, HttpTransport, withTimeout } from 'voltaire-effect/services'

const withTimeoutExample = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  // Run for 30 seconds then stop
  const fiber = yield* Effect.fork(
    Stream.runForEach(
      blockStream.watch({ include: 'header' }),
      (event) => Effect.log(`Block ${event.blocks[0]?.header.number}`)
    )
  )

  yield* Effect.sleep("30 seconds")
  yield* Fiber.interrupt(fiber)
  yield* Effect.log('Stopped watching')
})

// Or use withTimeout helper for per-request timeouts
const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  // ...
}).pipe(
  withTimeout("30 seconds"),
  Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)

Error Handling

import { Effect } from 'effect'
import { makeBlockStream, BlockStreamError } from 'voltaire-effect/services'
import * as Stream from 'voltaire-effect/stream'

const robustBackfill = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()

  yield* Stream.runForEach(
    blockStream.backfill({
      fromBlock: 18000000n,
      toBlock: 18100000n,
      include: 'header'
    }),
    (event) => Effect.log(`Got ${event.blocks.length} blocks`)
  )
}).pipe(
  Effect.catchTag('BlockStreamError', (e) =>
    Effect.logError(`Stream failed: ${e.message}`)
  ),
  Effect.catchTag('BlockRangeTooLargeError', (e) =>
    Effect.logWarning(`Range too large: ${e.fromBlock} to ${e.toBlock}`)
  )
)

Layer Composition

import { Effect } from 'effect'
import { makeBlockStream, HttpTransport } from 'voltaire-effect/services'

// Reusable transport layer
const MainnetTransport = HttpTransport('https://eth.llamarpc.com')

// Use in any program
const program = Effect.gen(function* () {
  const blockStream = yield* makeBlockStream()
  // ...
}).pipe(Effect.provide(MainnetTransport))