Skip to main content

Transaction Streaming

The TransactionStream module provides Effect-native transaction streaming for monitoring pending and confirmed transactions on Ethereum.

Overview

makeTransactionStream wraps voltaire core’s TransactionStream with Effect Stream integration, enabling:
  • Pending transactions: Watch mempool for new transactions
  • Confirmed transactions: Stream transactions with confirmation requirements
  • Transaction tracking: Follow a specific transaction through its lifecycle

makeTransactionStream

The stream provides three streaming methods, each returning an Effect Stream:
import { Effect, Stream } from 'effect';
import { makeTransactionStream, HttpTransport } from 'voltaire-effect/services';

const program = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    txStream.watchConfirmed({ confirmations: 3 }),
    (event) => Effect.log(`Confirmed: ${event.transaction.hash}`)
  );
}).pipe(
    Effect.provide(HttpTransport('https://mainnet.infura.io/v3/YOUR_KEY'))
);

Watching Pending Transactions

Stream pending transactions from the mempool with optional filtering:
import { Effect, Stream } from 'effect';
import { makeTransactionStream } from 'voltaire-effect/services';

const watchPending = Effect.gen(function* () {
  const stream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    stream.watchPending({ filter: { to: usdcAddress } }),
    (event) => Effect.log(`Pending: ${event.transaction.hash}`)
  );
});

Streaming Confirmed Transactions

Watch for transactions that meet confirmation requirements:
import { Effect, Stream } from 'effect';
import { makeTransactionStream } from 'voltaire-effect/services';

const watchConfirmed = Effect.gen(function* () {
  const stream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    stream.watchConfirmed({ 
      filter: { from: myAddress }, 
      confirmations: 3 
    }),
    (event) => Effect.log(`Confirmed: ${event.transaction.hash}`)
  );
});

Tracking a Specific Transaction

Track a transaction through pending, confirmed, and dropped states:
import { Effect, Stream } from 'effect';
import { makeTransactionStream } from 'voltaire-effect/services';

const trackTransaction = (txHash: string) => Effect.gen(function* () {
  const stream = yield* makeTransactionStream()
  
  yield* Stream.runForEach(
    stream.track(txHash, { confirmations: 3 }),
    (event) => {
      if (event.type === 'confirmed') {
        return Effect.log(`Confirmed in block ${event.transaction.blockNumber}`);
      }
      if (event.type === 'dropped') {
        return Effect.log(`Dropped: ${event.reason}`);
      }
      return Effect.log(`Pending...`);
    }
  );
});

Error Handling

Transaction stream operations can fail with TransactionStreamError:
import { Effect, pipe } from 'effect';
import { makeTransactionStream } from 'voltaire-effect/services';

const program = pipe(
  watchConfirmedTransactions(),
  Effect.catchTag("TransactionStreamError", (error) =>
    Effect.gen(function* () {
      yield* Effect.logError(`Stream failed: ${error.message}`);
      // Access cause for underlying error details
      if (error.cause) {
        yield* Effect.logDebug(`Cause: ${error.cause.message}`);
      }
    })
  )
);

TransactionStreamError

Thrown when any transaction stream operation fails:
import { TransactionStreamError } from 'voltaire-effect/services';

const error = new TransactionStreamError("Connection lost", {
  cause: new Error("WebSocket disconnected"),
  context: { txHash: "0x..." }
});
// error._tag === "TransactionStreamError"
// error.name === "TransactionStreamError"

Integration with BlockStream

TransactionStream’s confirmation tracking internally uses BlockStream for monitoring block confirmations. The confirmed transaction stream waits for the specified number of blocks before emitting events:
import { Effect, Stream } from 'effect';
import { makeTransactionStream, HttpTransport } from 'voltaire-effect/services';

// Confirmations are tracked via internal BlockStream integration
const program = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  
  // Wait for 12 confirmations (common for high-value transactions)
  yield* Stream.runForEach(
    txStream.watchConfirmed({ confirmations: 12 }),
    (event) => Effect.log(`Safe confirmation: ${event.transaction.hash}`)
  );
}).pipe(
    Effect.provide(HttpTransport('https://...'))
);

Layer Composition

makeTransactionStream requires TransportService:
import { Effect } from 'effect';
import { makeTransactionStream, HttpTransport, WebSocketTransport } from 'voltaire-effect/services';

// HTTP transport
const httpProgram = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  // ...
}).pipe(Effect.provide(HttpTransport('https://mainnet.infura.io/v3/KEY')));

// WebSocket transport (better for real-time pending txs)
const wsProgram = Effect.gen(function* () {
  const txStream = yield* makeTransactionStream()
  // ...
}).pipe(Effect.provide(WebSocketTransport('wss://mainnet.infura.io/ws/v3/KEY')));

See Also