// index.ts
import { Effect, Layer, Console, Schedule } from "effect"
// ============================================
// 1. Event Topic Hashes (pre-computed)
// ============================================
// These are keccak256 hashes of event signatures.
// Pre-compute them - don't hash at runtime.
const UNISWAP_V2_TOPICS = {
// keccak256("Swap(address,uint256,uint256,uint256,uint256,address)")
Swap: "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822",
// keccak256("Sync(uint112,uint112)")
Sync: "0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1",
// keccak256("PairCreated(address,address,address,uint256)")
PairCreated: "0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9",
} as const
// ============================================
// 2. Simple RPC Client with Fallback
// ============================================
const RPC_URLS = [
"https://eth.llamarpc.com",
"https://rpc.ankr.com/eth",
"https://ethereum.publicnode.com",
]
class RpcError {
readonly _tag = "RpcError"
constructor(readonly message: string) {}
}
const rpcCall = (method: string, params: unknown[]) => {
const tryUrl = (url: string) =>
Effect.tryPromise({
try: async () => {
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ jsonrpc: "2.0", id: 1, method, params }),
})
const json = await res.json()
if (json.error) throw new Error(json.error.message)
return json.result
},
catch: (e) => new RpcError(`${url}: ${e}`),
}).pipe(
Effect.retry(Schedule.exponential("100 millis").pipe(Schedule.compose(Schedule.recurs(2))))
)
// Try each URL in sequence until one succeeds
return RPC_URLS.slice(1).reduce(
(acc, url) => acc.pipe(Effect.orElse(() => tryUrl(url))),
tryUrl(RPC_URLS[0]!)
)
}
// ============================================
// 3. Decode Swap Event (manual hex slicing)
// ============================================
interface DecodedSwap {
pairAddress: string
sender: string
to: string
amount0In: bigint
amount1In: bigint
amount0Out: bigint
amount1Out: bigint
blockNumber: number
txHash: string
}
const decodeSwapLog = (log: {
address: string
topics: string[]
data: string
blockNumber: string
transactionHash: string
}): DecodedSwap | null => {
if (log.topics[0] !== UNISWAP_V2_TOPICS.Swap) return null
// Indexed params in topics (padded to 32 bytes)
const sender = "0x" + log.topics[1]!.slice(-40)
const to = "0x" + log.topics[2]!.slice(-40)
// Non-indexed params in data (each 32 bytes = 64 hex chars)
const data = log.data.slice(2) // remove 0x
const amount0In = BigInt("0x" + data.slice(0, 64))
const amount1In = BigInt("0x" + data.slice(64, 128))
const amount0Out = BigInt("0x" + data.slice(128, 192))
const amount1Out = BigInt("0x" + data.slice(192, 256))
return {
pairAddress: log.address.toLowerCase(),
sender: sender.toLowerCase(),
to: to.toLowerCase(),
amount0In,
amount1In,
amount0Out,
amount1Out,
blockNumber: parseInt(log.blockNumber, 16),
txHash: log.transactionHash,
}
}
// ============================================
// 4. Main Program
// ============================================
const main = Effect.gen(function* () {
yield* Console.log("🦄 Uniswap V2 Swap Indexer")
yield* Console.log("==========================\n")
// Get current block
const latestHex = yield* rpcCall("eth_blockNumber", [])
const latestBlock = BigInt(latestHex)
yield* Console.log(`Latest block: ${latestBlock}`)
// Fetch logs for last 10 blocks
const fromBlock = latestBlock - 10n
yield* Console.log(`Fetching swaps from block ${fromBlock} to ${latestBlock}...\n`)
const logs = yield* rpcCall("eth_getLogs", [{
fromBlock: "0x" + fromBlock.toString(16),
toBlock: "0x" + latestBlock.toString(16),
topics: [[UNISWAP_V2_TOPICS.Swap]], // OR filter for topic0
}])
// Decode and print swaps
const swaps = (logs as any[])
.map(decodeSwapLog)
.filter((s): s is DecodedSwap => s !== null)
yield* Console.log(`Found ${swaps.length} swaps:\n`)
for (const swap of swaps.slice(0, 10)) { // Show first 10
const direction = swap.amount0In > 0n ? "SELL" : "BUY"
yield* Console.log(
`[${direction}] Pair: ${swap.pairAddress.slice(0, 10)}... | ` +
`Block: ${swap.blockNumber} | TX: ${swap.txHash.slice(0, 10)}...`
)
}
if (swaps.length > 10) {
yield* Console.log(`\n... and ${swaps.length - 10} more swaps`)
}
yield* Console.log("\n✅ Done!")
})
// Run
Effect.runPromise(main).catch(console.error)