Watch New Blocks
Stream new blocks in real-time with automatic reorg handling.Copy
Ask AI
import { Effect, Stream } from 'effect'
import { makeBlockStream, HttpTransport } from 'voltaire-effect/services'
const program = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
yield* Stream.runForEach(
blockStream.watch({ include: 'transactions' }),
(event) => {
if (event.type === 'reorg') {
return Effect.log(`Reorg! Removed ${event.removed.length} blocks`)
}
const block = event.blocks[0]
return Effect.log(`Block ${block?.header.number}: ${block?.transactions.length} txs`)
}
)
}).pipe(
Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)
await Effect.runPromise(program)
Backfill Historical Blocks
Fetch blocks in a range with automatic chunking for large ranges.Copy
Ask AI
const backfillBlocks = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
let blockCount = 0
let txCount = 0
yield* Stream.runForEach(
blockStream.backfill({
fromBlock: 18000000n,
toBlock: 18000100n,
include: 'transactions'
}),
(event) =>
Effect.sync(() => {
for (const block of event.blocks) {
blockCount++
txCount += block.transactions.length
}
})
)
yield* Effect.log(`Processed ${blockCount} blocks with ${txCount} transactions`)
})
Include Levels
Control the data fetched per block:Copy
Ask AI
// Headers only (minimal data, fastest)
blockStream.backfill({
fromBlock: 18000000n,
toBlock: 18001000n,
include: 'header'
})
// Headers + full transaction objects
blockStream.backfill({
fromBlock: 18000000n,
toBlock: 18001000n,
include: 'transactions'
})
// Headers + transactions + receipts (most data)
blockStream.backfill({
fromBlock: 18000000n,
toBlock: 18001000n,
include: 'receipts'
})
Handle Reorganizations
BlockStream detects chain reorgs and provides undo/redo information:Copy
Ask AI
const handleReorgs = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
yield* Stream.runForEach(
blockStream.watch({ include: 'transactions' }),
(event) => {
if (event.type === 'reorg') {
return Effect.gen(function* () {
// Undo state for removed blocks (newest first)
for (const removed of event.removed) {
yield* Effect.log(`Rollback block ${removed.number}`)
// Your rollback logic here
}
// Apply state for added blocks (oldest first)
for (const added of event.added) {
yield* Effect.log(`Apply block ${added.header.number}`)
// Your apply logic here
}
yield* Effect.log(`Common ancestor: ${event.commonAncestor.number}`)
})
}
// Normal new blocks
for (const block of event.blocks) {
yield* Effect.log(`New block ${block.header.number}`)
}
return Effect.void
}
)
})
Stream Processing
Use Effect Stream operators for composable transformations:Copy
Ask AI
const processBlocks = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
// Extract block summaries
const summaries = blockStream.backfill({
fromBlock: 18000000n,
toBlock: 18000100n,
include: 'transactions'
}).pipe(
Stream.flatMap((event) => Stream.fromIterable(event.blocks)),
Stream.map((block) => ({
number: block.header.number,
txCount: block.transactions.length,
gasUsed: block.header.gasUsed,
timestamp: block.header.timestamp
})),
// Filter blocks with transactions
Stream.filter((b) => b.txCount > 0),
// Take first 50
Stream.take(50)
)
const results = yield* Stream.runCollect(summaries)
yield* Effect.log(`Found ${results.length} blocks with transactions`)
})
Progress Tracking
Track progress for large backfill operations:Copy
Ask AI
const backfillWithProgress = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
const fromBlock = 18000000n
const toBlock = 18100000n
const totalBlocks = Number(toBlock - fromBlock)
let processedBlocks = 0
yield* Stream.runForEach(
blockStream.backfill({ fromBlock, toBlock, include: 'header' }),
(event) =>
Effect.gen(function* () {
processedBlocks += event.blocks.length
const progress = (processedBlocks / totalBlocks) * 100
if (processedBlocks % 1000 === 0 || processedBlocks === totalBlocks) {
yield* Effect.log(`Progress: ${progress.toFixed(1)}% (${processedBlocks}/${totalBlocks})`)
}
})
)
yield* Effect.log('Backfill complete!')
})
Combine Backfill + Watch
Seamlessly transition from historical to live streaming:Copy
Ask AI
const historicalToLive = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
// Backfill from a known block
const backfill = blockStream.backfill({
fromBlock: 18000000n,
toBlock: 'latest',
include: 'header'
})
// Watch for new blocks
const watch = blockStream.watch({ include: 'header' })
// Combine: backfill first, then continue watching
const combined = Stream.concat(backfill, watch)
yield* Stream.runForEach(
Stream.take(combined, 200), // Take 200 blocks total
(event) =>
Effect.log(`Block batch: ${event.blocks.map(b => b.header.number).join(', ')}`)
)
})
Cancellation
Gracefully stop streaming:Copy
Ask AI
import { Effect, Stream, Fiber } from 'effect'
import { makeBlockStream, HttpTransport, withTimeout } from 'voltaire-effect/services'
const withTimeoutExample = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
// Run for 30 seconds then stop
const fiber = yield* Effect.fork(
Stream.runForEach(
blockStream.watch({ include: 'header' }),
(event) => Effect.log(`Block ${event.blocks[0]?.header.number}`)
)
)
yield* Effect.sleep("30 seconds")
yield* Fiber.interrupt(fiber)
yield* Effect.log('Stopped watching')
})
// Or use withTimeout helper for per-request timeouts
const program = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
// ...
}).pipe(
withTimeout("30 seconds"),
Effect.provide(HttpTransport('https://eth.llamarpc.com'))
)
Error Handling
Copy
Ask AI
import { Effect } from 'effect'
import { makeBlockStream, BlockStreamError } from 'voltaire-effect/services'
import * as Stream from 'voltaire-effect/stream'
const robustBackfill = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
yield* Stream.runForEach(
blockStream.backfill({
fromBlock: 18000000n,
toBlock: 18100000n,
include: 'header'
}),
(event) => Effect.log(`Got ${event.blocks.length} blocks`)
)
}).pipe(
Effect.catchTag('BlockStreamError', (e) =>
Effect.logError(`Stream failed: ${e.message}`)
),
Effect.catchTag('BlockRangeTooLargeError', (e) =>
Effect.logWarning(`Range too large: ${e.fromBlock} to ${e.toBlock}`)
)
)
Layer Composition
Copy
Ask AI
import { Effect } from 'effect'
import { makeBlockStream, HttpTransport } from 'voltaire-effect/services'
// Reusable transport layer
const MainnetTransport = HttpTransport('https://eth.llamarpc.com')
// Use in any program
const program = Effect.gen(function* () {
const blockStream = yield* makeBlockStream()
// ...
}).pipe(Effect.provide(MainnetTransport))

