Emitting events
A connector's second job, after providing tools, is turning what its source system says into EventEnvelopes: the one canonical event that a Shopify webhook, an MQTT temperature reading, and a cron tick all become. This page is the field-level guide for connector authors: what you must set, how to reference entities so single-flight works downstream, how to keep duplicates out with dedupe keys, and how to batch, backfill, and reason about ordering. Field names here match packages/kernel/src/envelope.ts exactly.
From source payload to envelope
The path is short and the division of labor is fixed. Your connector declares its streams in the def's events record and maps each source payload to the envelope fields it owns; the platform stamps identity and tenancy and puts the result on the bus, where the router matches it against operator triggers.
export interface EventEnvelope {
event_id: string;
reseller_id: string | null; // null = Fibric-direct. Present on EVERY envelope.
tenant_id: string;
workspace_id: string | null;
source: string; // "shopify" | "bacnet-gw-7" | "cron" | "operator:jenny" | ...
event_type: string; // "order.created" | "hvac.zone.fault" | ...
correlation_id: string;
payload: Record<string, unknown>;
agent_id: string | null;
session_id: string | null;
}
Ingestion never acts. An envelope can only cause an operator to propose a plan, and the deterministic executor disposes that plan under policy. Emit freely; the governance boundary is downstream of you.
Declaring event streams
Each entry in the def's events record declares one stream, keyed by the event_type it emits:
events: {
'order.created': { kind: 'webhook', topic: 'orders/create' },
'order.updated': { kind: 'webhook', topic: 'orders/updated' },
'inventory.level': { kind: 'poll' }, // no webhook offered upstream
},
| Field | Type | Required | Description |
|---|---|---|---|
| key | string | yes | The event_type this stream emits, in dotted noun.verb form. Operators match triggers (order.*) against it. |
kind | 'webhook' | 'poll' | yes | webhook: the source pushes; the platform provisions an ingest URL per connection and verifies signatures. poll: the platform schedules pulls; the shape for hardware and APIs without webhooks. |
topic | string | no | The upstream subscription name: a webhook topic, an MQTT topic, a queue. Documentation for connection setup, and what the platform subscribes to where it can do so automatically. |
Envelope fields you control
Of the ten envelope fields, four are stamped by the platform and are not yours to set; six are yours. Getting the split right is most of this page.
| Field | Who sets it | Guidance |
|---|---|---|
event_id | platform | Assigned at ingest. Never supply one; supply a dedupe key instead. |
reseller_id, tenant_id, workspace_id | platform | Stamped from the connection. A connector cannot emit into another tenant; isolation is structural, not a field you fill in correctly. |
source | you | Your connector id, or a per-device identifier for hardware fan-in: cn-magento, bacnet-gw-7. Stable, lowercase, meaningful in a receipt. |
event_type | you | Must be one of the types declared in your def's events record. Emitting an undeclared type is rejected at ingest. |
correlation_id | you, when continuing a thread | Omit for a fresh observation; the platform generates one. Set it when the event continues existing work, for example a delivery confirmation for an action an operator took. |
payload | you | The observation, normalized. See below. |
agent_id, session_id | you, operators only | Null for external observations. Operator packs set them so their own output is attributable in the stream. |
For payload, normalize rather than forward. Emit the fields an operator can reason over, under the names the capability vocabulary already uses, and keep the vendor's raw blob out or under a single raw key if downstream debugging truly needs it. A payload is not a place to smuggle a second schema.
// what the platform asks your connector for, per declared stream:
// a pure mapping from the source payload to the envelope fields you own.
export function mapOrderCreated(hook: MagentoOrderHook) {
return {
source: 'cn-magento',
event_type: 'order.created',
dedupe_key: `magento:order:${hook.entity_id}:created`, // see below
payload: {
order_id: String(hook.increment_id),
entity: { kind: 'order', id: String(hook.increment_id) },
total: Number(hook.grand_total),
currency: hook.order_currency_code,
customer_email: hook.customer_email,
placed_at: hook.created_at,
},
};
}
Entity references
Downstream, the executor serializes side effects per entity using each planned action's entity_key, and operators build those keys from what your payload tells them. The convention that makes this work is an explicit entity reference in every payload:
payload: {
entity: { kind: 'order', id: 'SO-11290' }, // what this event is ABOUT
// ...the rest of the observation
}
// an operator sensing this event derives, mechanically:
// entity_key: 'order:SO-11290'
// idempotency_key: 'order-risk:order:SO-11290:hold'
- One primary entity per event. If a source webhook describes two things, emit two envelopes. An event about "an order and its customer" is an event about the order that mentions the customer.
- Use the source system's durable id, as a string, never a display name or an index that renumbers.
- Keep
kindin the shared vocabulary:order,conversation,shipment,zone,device. The same nouns your tool capabilities use.
Skipping the entity reference does not break ingest, but it breaks the thing that matters: an operator that cannot identify the entity cannot construct a correct entity_key, and single-flight degrades from per-entity to nothing.
Dedupe keys
Sources are unreliable in one specific way: they deliver twice. Webhooks retry, pollers overlap, gateways reconnect and replay. The envelope bus dedupes at ingest on a dedupe_key you supply with each emission; two emissions with the same key within the dedupe window collapse into one envelope, and the second is acknowledged but not re-routed.
// source : entity kind : entity id : what happened [ : version discriminator ]
dedupe_key: `magento:order:${id}:created` // creation happens once
dedupe_key: `magento:order:${id}:updated:${hook.updated_at}` // updates recur; discriminate
dedupe_key: `bacnet-gw-7:zone:12:reading:${ts_bucket}` // sensor sample per interval
| Rule | Why |
|---|---|
| Derive the key from the source's own identifiers, never from a hash of the whole payload. | Vendors reorder JSON keys and add fields between retries; a payload hash makes retries look like new events. |
| For recurring event types, include a version discriminator: the source's updated-at, revision number, or sample bucket. | Without one, the second legitimate update dedupes against the first and is lost. |
| Do not include your own timestamps or random values. | They defeat dedupe entirely; every retry becomes a fresh key. |
Ingest dedupe (your dedupe_key) keeps duplicate observations off the bus. Executor dedupe (the plan's idempotency_key) keeps duplicate side effects out of the world. They are independent, and the second still protects you when the first misses, which is exactly the layering that made the 657-message flood structurally impossible to repeat.
Batching
Poll-kind streams and busy webhooks produce bursts. Emit them as a batch, not a loop of single publishes: a batch is atomic at ingest (all envelopes accepted or none), dedupe applies per element, and it counts once against connection throughput rather than per element.
// a poll handler returns the batch; the platform ingests it atomically
export async function pollInventory(ctx: ConnectorCtx) {
const levels = await fetchLevels(ctx); // one upstream call
return levels.map((l) => ({
source: 'cn-magento',
event_type: 'inventory.level',
dedupe_key: `magento:sku:${l.sku}:level:${l.as_of}`,
payload: { entity: { kind: 'sku', id: l.sku }, qty: l.qty, as_of: l.as_of },
}));
}
- Keep batches under 500 envelopes; split larger sets. Ceilings per connection are on rate limits & quotas.
- Order within a batch is preserved onto the bus, which makes a batch the right unit for a page of time-ordered source records.
- Do not aggregate distinct observations into one envelope to save volume. Fifty zone readings are fifty envelopes; an operator subscribes to what it needs, and a mega-payload defeats routing.
Backfill
When a connection is first made, or repaired after an outage, the tenant usually wants history. Backfill is the same mapping run over the source's list APIs, with two switches that make it safe:
# replay two weeks of source history through the connector's mapping
$ fibric connectors backfill cn-magento --connection magento-live \
--stream order.created --since 2026-06-18 --mode quiet
fetched 3,412 records · emitted 3,412 envelopes (dedupe skipped 0)
mode quiet: envelopes stored and queryable; operators were not triggered
| Mode | Behavior | Use when |
|---|---|---|
quiet | Envelopes land in the store and are queryable, but the router does not trigger operators. | Seeding history. You rarely want an operator reacting today to an order from June. |
live | Envelopes route normally. | Closing a short gap after an outage, where the reactions are still wanted. |
Because your dedupe keys are derived from source identifiers, backfill is naturally re-runnable: a second pass over the same window emits nothing new. This is the property to test before shipping a connector; fibric dev replay --twice exercises it locally, see fixture replay.
Ordering guidance
The bus preserves emission order per connection and per batch, but sources do not: webhook retries arrive late, pollers see state after several changes collapsed, and two gateways racing emit interleaved. Design for it rather than around it:
- Carry the source's own timestamp in the payload (
placed_at,as_of,observed_at) and have operators reason from it, never from arrival order. - Emit state, not diffs, where the source allows. "The order's status is now
holded" survives reordering; "the status changed" does not. - Let the executor absorb the residue. When two events about one entity race two plans, single-flight on
entity_keyserializes their side effects, and idempotency keys collapse the duplicate ones. Correct entity references buy you ordering tolerance for free. - Never buffer to reorder inside the connector. A connector that holds events back to sort them adds latency and a crash-loss window, and still cannot beat a late webhook retry.
Keep going
- The event envelope: the concept page for the shape you are emitting.
- Exposing actions: the other half of a connector's surface.
- Single-flight & idempotency: what your entity references enable downstream.
- Local development: tailing and replaying envelopes against the local kernel.