Skip to main content
You're viewing v3 documentation

This is the v3 HyperIndex documentation. Still on an older version? Open the v2 documentation and consider migrating to v3.

Kafka

Publish decoded events to one or more Kafka topics. Use a partition key to fan out across partitions (with unorderedAfterCommit) or pin the topic to a single partition for strict ordering (with orderedAfterCommit).

Installation

pnpm add kafkajs

Configure the client

src/clients/kafka.ts
import { Kafka, Partitioners } from "kafkajs";

export const kafka = new Kafka({
clientId: "envio-indexer",
brokers: (process.env.KAFKA_BROKERS ?? "").split(","),
ssl: true,
sasl: {
mechanism: "plain",
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
},
});

export const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
allowAutoTopicCreation: false,
});

let connected: Promise<void> | undefined;
export const ensureConnected = () => (connected ??= producer.connect());

Define the effect

Topic name is baked into the effect — input carries only the partition key and the per-event payload. The effect serialises to JSON internally; the handler passes raw values.

src/effects/kafka.ts
import { createEffect, S } from "envio";
import { ensureConnected, producer } from "../clients/kafka";

const TOPIC = "rocketpool.transfers";

export const publishTransfer = createEffect(
{
name: "publishTransfer",
input: {
key: S.string, // partition key
from: S.string,
to: S.string,
value: S.bigint,
txHash: S.string,
blockNumber: S.number,
chainId: S.number,
},
rateLimit: { calls: 200, per: "second" },
// unordered = parallel dispatch, partition order preserved by `key`
mode: "unorderedAfterCommit",
},
async ({ input }) => {
await ensureConnected();
const { key, value, ...rest } = input;
await producer.send({
topic: TOPIC,
acks: -1, // "all"
messages: [
{
key,
value: JSON.stringify({ ...rest, value: value.toString() }),
},
],
});
}
);

Call it from a handler

The rindexer config…

streams:
kafka:
topics:
- topic: test-topic
key: my-routing-key
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000"
- "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"

…becomes:

src/handlers/RocketPoolETH.ts
import { indexer } from "envio";
import { publishTransfer } from "../effects/kafka";

const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
const MIN = 2_000_000_000_000_000_000n;

indexer.onEvent(
{ contract: "RocketPoolETH", event: "Transfer" },
async ({ event, context }) => {
const { from, to, value } = event.params;

if (from.toLowerCase() === WHALE && value >= MIN) {
context.effect(publishTransfer, {
key: from, // partition by sender so per-sender order is preserved
from,
to,
value,
txHash: event.transaction.hash,
blockNumber: event.block.number,
chainId: context.chain.id,
});
}
},
);

Mode picker

You needMode
High throughput, partition by keyunorderedAfterCommit
Strict global order across the whole topicorderedAfterCommit (and a single-partition topic)
Lowest latency, idempotent consumerunordered (parallel, inline) or ordered (sequential, inline) — fires before the DB commit

Raw fetch alternative

If you'd rather not pull in kafkajs, point the effect at a Kafka REST proxy (e.g. Confluent REST Proxy) and POST JSON instead.