Skip to main content

Overview

EventStream provides Effect-native contract event streaming with:
  • Backfill - Historical event fetching with dynamic chunking
  • Watch - Real-time event polling with deduplication
  • Retry logic - Exponential backoff for transient errors
  • Type safety - ABI-typed event args via generics

Quick Start

import { Effect, Stream } from 'effect'
import { makeEventStream, HttpTransport } from 'voltaire-effect'

const transferEvent = {
  type: 'event',
  name: 'Transfer',
  inputs: [
    { name: 'from', type: 'address', indexed: true },
    { name: 'to', type: 'address', indexed: true },
    { name: 'value', type: 'uint256', indexed: false }
  ]
} as const

const program = Effect.gen(function* () {
  const eventStream = yield* makeEventStream()

  // Backfill historical Transfer events
  yield* Stream.runForEach(
    eventStream.backfill({
      address: '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48',
      event: transferEvent,
      fromBlock: 18000000n,
      toBlock: 18001000n
    }),
    ({ log, metadata }) => Effect.log(`Transfer: ${log.args.value} at block ${log.blockNumber}`)
  )
}).pipe(
    Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)

Backfill

Fetch historical events within a block range. Uses dynamic chunking to handle large ranges efficiently.
const stream = eventStream.backfill({
  address: contractAddress,
  event: transferEvent,
  fromBlock: 18000000n,
  toBlock: 19000000n,
  chunkSize: 500,        // Initial chunk size (default: 100)
  minChunkSize: 10       // Minimum after reduction (default: 10)
})
Chunk size automatically:
  • Reduces on “block range too large” errors
  • Increases after consecutive successes

Watch

Poll for new events in real-time.
const stream = eventStream.watch({
  address: contractAddress,
  event: approvalEvent,
  fromBlock: 18000000n,   // Optional, defaults to current block
  pollingInterval: 1000   // Poll every 1s (default)
})

// Use with Stream.take to limit events
yield* Stream.runForEach(
  Stream.take(stream, 10),
  ({ log }) => Effect.log(`New approval: ${log.args.spender}`)
)

Topic Filtering

Filter events by indexed parameters:
const myTransfers = eventStream.backfill({
  address: contractAddress,
  event: transferEvent,
  filter: { from: myAddress },  // Only transfers FROM myAddress
  fromBlock: 18000000n,
  toBlock: 18001000n
})

Event Result

Each yielded result contains:
interface EventStreamResult<TEvent> {
  log: {
    eventName: string
    args: { from: string, to: string, value: bigint }  // Typed from ABI
    blockNumber: bigint
    blockHash: Uint8Array
    transactionHash: Uint8Array
    logIndex: number
  }
  metadata: {
    chainHead: bigint    // Current chain tip
    fromBlock: bigint    // Range start
    toBlock: bigint      // Range end
  }
}

Error Handling

import { EventStreamError } from 'voltaire-effect'

program.pipe(
  Effect.catchTag('EventStreamError', (e) => 
    Effect.log(`Stream error: ${e.message}`)
  )
)
Underlying errors:
  • EventStreamAbortedError - Stream was aborted via signal
  • BlockRangeTooLargeError - RPC rejected range (handled internally via chunking)

Retry Configuration

const stream = eventStream.backfill({
  address: contractAddress,
  event: transferEvent,
  fromBlock: 18000000n,
  toBlock: 19000000n,
  retry: {
    maxRetries: 5,        // Default: 3
    initialDelay: 500,    // Default: 1000ms
    maxDelay: 60000       // Default: 30000ms
  }
})

Layer Composition

import { Effect } from 'effect'
import { makeEventStream, HttpTransport } from 'voltaire-effect'

// Reusable transport layer
const AppLayer = HttpTransport('https://eth.llamarpc.com')

// Use in program
const program = Effect.gen(function* () {
  const eventStream = yield* makeEventStream()
  // ...
}).pipe(Effect.provide(AppLayer))