Example projects
Five complete, runnable projects, each small enough to read in one sitting and each demonstrating one discipline the platform cares about: a read-only Slack connector that only senses, a bridge that turns arbitrary webhooks into governed envelopes, an MQTT operator that proves hardware and SaaS flow through identical machinery, an action tool that honors the idempotency contract in full, and a scheduled operator that reports without acting. Every example uses the real SDK surface from the Connector SDK and runs against the local kernel from local development.
How to read these
Each example is a complete project: one defineConnector() or defineOperator() def, a fixture file where replay is the point, and the commands to run it. The code is shown in full rather than in fragments, because the discipline these examples teach lives in the parts fragments usually omit: the dedupe keys, the preconditions, the honest error paths.
Conventions shared by all five:
- Scaffold with
fibric init <name> --template connector(or--template operator), then replacesrc/index.tswith the example. - Imports come from
@fibric/connector-sdkfor connectors and tools,@fibric/sdkfor operators. Operators are connectors too; the split is ergonomic, not architectural. - Nothing here calls a side effect directly. Reads run freely; writes are proposed as plan actions and disposed by the deterministic executor under a default-closed policy.
- Every project passes the two-pass replay test before it ships:
fibric dev replay fixtures/sample.jsonl --twice --strictmust come back allDEDUPon the second pass. See running any of these.
| Example | Kind | The discipline it demonstrates |
|---|---|---|
| Slack digest | connector, read-only | Ingesting a poll-based stream with stable dedupe keys and no write surface at all. |
| Webhook bridge | connector, ingest-only | Verifying, refusing, and mapping arbitrary POSTs onto the canonical envelope. |
| Temperature watch | connector + operator | Hardware through the same machinery: sense MQTT, propose a setpoint, dispose under policy. |
| Order hold | action tool | The full idempotency contract: dedup, single-flight, and a converging handler. |
| Nightly report | operator, scheduled | A date-keyed side effect that structurally cannot send twice, and an empty plan on quiet days. |
Read-only Slack digest connector
The smallest honest connector: it polls the Slack conversations API for new messages in the channels you configure, turns each one into an envelope, and exposes exactly one tool, a read. There is no write path at all, which means no policy question ever arises; an operator downstream can summarize, count, or correlate, and anything it wants to do about a message must go through some other connector's governed action.
import { defineConnector, tool, oauth2, type ConnectorCtx } from '@fibric/connector-sdk';
interface SlackMessage {
ts: string; // Slack's message timestamp doubles as its id
channel: string;
user: string;
text: string;
thread_ts?: string;
}
async function fetchNewMessages(ctx: ConnectorCtx): Promise<SlackMessage[]> {
const channels = (ctx.config.channels as string[]) ?? [];
const out: SlackMessage[] = [];
for (const channel of channels) {
// in prod, ctx.http is per-tenant, rate-limited, retrying, creds injected
const res = await slackApi(ctx, 'conversations.history', {
channel,
oldest: String(ctx.config.cursor ?? 0),
limit: 200,
});
for (const m of res.messages) out.push({ ...m, channel });
}
return out;
}
export default defineConnector({
id: 'cn-slack-digest',
version: '1.0.0',
category: 'comms',
publisher: 'private',
auth: oauth2({ scopes: ['channels:history', 'channels:read'] }),
tools: {
// the one read an operator can sense through. no sideEffecting flag:
// reads need no policy and no idempotency.
'message.read': tool({
input: (args) => {
const a = args as Record<string, unknown>;
if (typeof a.channel !== 'string') throw new Error('channel is required');
return a;
},
handler: async (ctx, args) =>
slackApi(ctx, 'conversations.history', { channel: args.channel, limit: 50 }),
}),
},
events: {
// the digest stream: each new message becomes one envelope
'message.posted': { kind: 'poll', topic: 'conversations.history' },
},
probe: (ctx) => ({
status: 'ok',
metric: { label: 'channels watched', value: (ctx.config.channels as string[])?.length ?? 0 },
}),
});
The poll handler maps each message onto the envelope fields documented in emitting events. Slack's ts is the natural dedupe key: it is unique per channel and stable across polls, so a poll window that overlaps the previous one re-emits nothing.
export async function poll(ctx: ConnectorCtx, emit: (e: EmitInput) => void) {
for (const m of await fetchNewMessages(ctx)) {
emit({
event_type: 'message.posted',
dedupe_key: `slack:${m.channel}:${m.ts}`, // stable across overlapping polls
payload: {
entity: { kind: 'channel', id: m.channel },
user: m.user,
text: m.text,
thread_ts: m.thread_ts ?? null,
},
});
}
}
What to notice: the connector never summarizes. Digesting is reasoning, and reasoning belongs to an operator; the connector's whole job is to make the raw signal arrive as well-formed, deduplicated envelopes. The nightly report operator below is a natural consumer.
Webhook-to-envelope bridge
Plenty of systems can POST JSON somewhere but will never have a first-class connector. The bridge accepts any webhook, validates the little it can, and maps it onto the canonical EventEnvelope shape so the rest of the platform never knows the source was ad hoc. This is the pattern behind every kind: 'webhook' event stream; here it is isolated so you can see all of it.
import { defineConnector, apiKey } from '@fibric/connector-sdk';
import { createHmac, timingSafeEqual } from 'node:crypto';
export default defineConnector({
id: 'cn-webhook-bridge',
version: '1.0.0',
category: 'data',
publisher: 'private',
// the sender authenticates to US with a shared secret; see connector auth patterns
auth: apiKey(),
tools: {}, // nothing to read back, nothing to act on: this connector only ingests
events: {
'bridge.received': { kind: 'webhook' },
},
});
The webhook handler does three jobs, in order: verify the sender, refuse what it cannot map, and emit an envelope whose fields mean what the platform says they mean. The kernel stamps tenant_id, reseller_id, and event_id; the bridge's responsibility is event_type, correlation_id, the entity reference, and the dedupe key.
interface BridgePayload {
type: string; // becomes event_type, namespaced below
id: string; // the sender's id for this occurrence
entity?: { kind: string; id: string };
data?: Record<string, unknown>;
correlation?: string;
}
export async function onWebhook(ctx: ConnectorCtx, req: WebhookRequest, emit: (e: EmitInput) => void) {
// 1. verify: HMAC over the raw body with the shared secret
const mac = createHmac('sha256', ctx.secret('signing_key')).update(req.rawBody).digest();
const given = Buffer.from(req.headers['x-bridge-signature'] ?? '', 'hex');
if (given.length !== mac.length || !timingSafeEqual(mac, given)) {
return req.reject(401, 'bad signature');
}
// 2. refuse what cannot be mapped; a webhook with no type or id is noise, not signal
const body = req.json as BridgePayload;
if (!body?.type || !body?.id) return req.reject(422, 'type and id are required');
if (!/^[a-z_]+(\.[a-z_]+)+$/.test(body.type)) {
return req.reject(422, 'type must be noun.verb, e.g. invoice.paid');
}
// 3. emit: one occurrence, one envelope, deduplicated on the sender's id
emit({
event_type: `bridge.${body.type}`,
dedupe_key: `bridge:${body.type}:${body.id}`, // sender retries collapse here
correlation_id: body.correlation, // optional; kernel generates if absent
payload: {
entity: body.entity ?? null,
...body.data,
},
});
req.accept(202);
}
Point anything at it and watch the stream normalize:
$ fibric dev # webhooks on :4310
$ curl -s -X POST http://localhost:4310/webhooks/cn-webhook-bridge \
-H "x-bridge-signature: $(sign payload.json)" \
-d '{"type":"invoice.paid","id":"inv_2214","entity":{"kind":"invoice","id":"inv_2214"},"data":{"amount":420.00}}'
$ fibric events tail --source cn-webhook-bridge
11:02:44 bridge.invoice.paid src=cn-webhook-bridge corr=co-77b1
What to notice: the dedupe_key is built from the sender's own id, so a sender that retries its POST (they all do) produces exactly one envelope. Accepting with 202 before any downstream work is the correct contract: the bridge's promise is durable receipt, not synchronous processing.
Temperature watch on MQTT
The example that makes the platform's claim concrete: a thermostat reading and a support message flow through identical machinery. A hardware connector subscribes to an MQTT topic and emits one envelope per reading; an operator subscribes to those envelopes and proposes a setpoint change when a zone drifts. The executor disposes the change under policy, exactly as it would dispose an order hold.
First, the connector. category: 'hardware' is metadata, not a different code path:
import { defineConnector, tool, none } from '@fibric/connector-sdk';
export default defineConnector({
id: 'cn-mqtt-hvac',
version: '1.0.0',
category: 'hardware',
publisher: 'private',
// the broker is on the private network; transport security lives below this layer
auth: none(),
tools: {
'zone.read': tool({
handler: async (ctx, args) => readZoneState(ctx, args.zone_id as string),
}),
// the ONE write: bounded, side-effecting, therefore governed
'zone.setpoint': tool({
sideEffecting: true,
input: (args) => {
const a = args as Record<string, unknown>;
const target = Number(a.target_c);
if (typeof a.zone_id !== 'string') throw new Error('zone_id is required');
if (!Number.isFinite(target) || target < 16 || target > 26) {
throw new Error('target_c must be within 16..26'); // bound in code AND in policy
}
return { zone_id: a.zone_id, target_c: target };
},
handler: async (ctx, args) =>
publishSetpoint(ctx, args.zone_id as string, args.target_c as number),
}),
},
events: {
// one envelope per reading, from the broker subscription
'zone.temperature': { kind: 'webhook', topic: 'site/+/zone/+/temp' },
},
probe: (ctx) => ({
status: 'ok',
metric: { label: 'zones reporting (5m)', value: zonesSeenRecently(ctx) },
}),
});
Then the operator. It watches the stream, ignores jitter, and proposes only when a zone is genuinely out of band, with both keys built deterministically per zone:
import { defineOperator } from '@fibric/sdk';
export default defineOperator({
name: 'temp-watch',
goal: 'Keep every zone within its comfort band, and page a human before acting on server rooms.',
requires: ['zone.read', 'zone.setpoint', 'notify.send'],
model: 'router:default',
trigger: { on: 'zone.temperature' },
async run(ctx) {
const zoneId = ctx.event.payload.entity?.id as string;
// sense current state, not the triggering reading: sensors repeat and drift
const zone = await ctx.sense('zone.read', { zone_id: zoneId });
const drift = zone.temp_c - zone.setpoint_c;
if (Math.abs(drift) < 1.5) {
return { reasoning: `zone ${zoneId} within band (drift ${drift.toFixed(1)}C)`, actions: [] };
}
const verdict = await ctx.reason({ zone, drift }); // model decides what; code decides keys
return {
reasoning: verdict.summary,
actions: [{
connector: ctx.binding('zone.setpoint'),
tool: 'zone.setpoint',
args: { zone_id: zoneId, target_c: verdict.target_c },
entity_key: `zone:${zoneId}`, // one change in flight per zone
idempotency_key: `temp-watch:${zoneId}:${verdict.target_c}`,
}],
};
},
});
The policy is where the physical stakes get their teeth. Ordinary zones adjust freely inside a rate limit; server-room zones alert a human first:
policy: temp-watch-guardrails
applies_to: temp-watch
rules:
- allow: zone.setpoint
limit: { per: hour, max: 12 } # a flapping sensor cannot thrash the plant
single_flight: zone_id
- require_confirmation: zone.setpoint
when: { zone_class: server_room } # people first where the blast radius is real
default: deny
Replay a drifting zone against it. The fixture is two readings for the same zone; the second must dedupe if the model recommends the same target:
{"source":"cn-mqtt-hvac","event_type":"zone.temperature","payload":{"entity":{"kind":"zone","id":"12"},"temp_c":24.9,"setpoint_c":21.0}}
{"source":"cn-mqtt-hvac","event_type":"zone.temperature","payload":{"entity":{"kind":"zone","id":"12"},"temp_c":25.1,"setpoint_c":21.0}}
$ fibric dev replay fixtures/drift.jsonl
replayed 2 envelopes
routed 2 -> temp-watch
disposed zone.setpoint zone:12 ALLOW zone.setpoint zone:12 DEDUP
# same recommended target => same idempotency key => one write
What to notice: the idempotency key includes the target value, so a repeated reading that produces the same recommendation collapses to DEDUP, while a genuinely new target is a genuinely new action. And the empty plan is doing real work; most readings should end in { actions: [] }.
Order-hold tool with idempotency
The full idempotency contract from exposing actions, honored end to end in one tool. The executor already deduplicates on idempotency_key before your handler runs; the handler's remaining obligation is to be safe against the retries the executor itself cannot see, by converging on the target state instead of blindly transitioning.
import { tool, type ConnectorCtx } from '@fibric/connector-sdk';
import { z } from 'zod';
const HoldArgs = z.object({
order_id: z.string().regex(/^SO-\d+$/),
reason: z.enum(['promise_risk', 'payment_review', 'address_mismatch']),
note: z.string().max(500).optional(),
}).strict();
class PreconditionFailed extends Error {}
export const orderHold = tool({
sideEffecting: true, // routes through the executor + trust policy
input: (args) => HoldArgs.parse(args),
handler: async (ctx: ConnectorCtx, args) => {
const order = await getOrder(ctx, args.order_id as string);
// converge, don't transition: already held is success, not conflict
if (order.status === 'holded') {
ctx.log('hold no-op, already held', { order: args.order_id });
return { held: true, changed: false, status: order.status };
}
// the world moved between proposing and disposing: fail with a stable reason
if (order.status === 'complete' || order.status === 'canceled') {
throw new PreconditionFailed(`order ${args.order_id} is ${order.status}; cannot hold`);
}
// one honest call, reported honestly
const updated = await putOrderStatus(ctx, args.order_id as string, {
status: 'holded',
comment: `[fibric] ${args.reason}${args.note ? `: ${args.note}` : ''}`,
});
return { held: true, changed: true, status: updated.status };
},
});
The layers, and which retry each one absorbs:
| Layer | Absorbs | Where |
|---|---|---|
idempotency_key dedup | The same plan action proposed or replayed twice. Second disposition is DEDUP; the handler never runs. | Executor, before the handler. |
entity_key single-flight | Two different actions racing on one order. They serialize; the second sees the first's outcome in its precondition read. | Executor, around the handler. |
| Converging handler | A retry after a timeout where the first attempt actually landed. The re-read sees holded and succeeds without a second write. | Your code, above. |
Prove all three with the two-pass replay:
$ fibric dev replay fixtures/hold.jsonl --twice --strict
pass 1: order.hold SO-11290 ALLOW ok key=ship-risk:SO-11290:hold
pass 2: order.hold SO-11290 DEDUP ok key=ship-risk:SO-11290:hold
strict: 0 blocks, 0 errors
What to notice: changed: false in the no-op return. The receipt records whether the world moved, not only whether the call succeeded, and an audit that can distinguish "held it" from "found it already held" is worth the one extra field.
Nightly report operator
A scheduled operator whose only side effect is a message. It wakes once a night, senses the day through whatever read capabilities the tenant bound, asks the model for a short operational summary, and proposes a single notify.send. It composes naturally with the Slack digest connector: bind message.read and the day's channel traffic becomes part of the picture it reports on.
import { defineOperator } from '@fibric/sdk';
export default defineOperator({
name: 'nightly-report',
goal: 'Every morning, the team reads one honest summary of yesterday: what moved, what stalled, what needs a decision.',
requires: ['orders.read', 'conversations.read', 'message.read', 'notify.send'],
model: 'router:default',
trigger: { every: '24h' }, // the tick arrives as an envelope, source: 'cron'
async run(ctx) {
// the reporting window is derived from the tick, deterministically
const until = new Date(ctx.event.payload.tick_at as string);
const since = new Date(until.getTime() - 24 * 3600 * 1000);
const day = since.toISOString().slice(0, 10);
const [orders, conversations, chatter] = await Promise.all([
ctx.sense('orders.read', { updated_since: since.toISOString() }),
ctx.sense('conversations.read', { updated_since: since.toISOString() }),
ctx.sense('message.read', { channel: ctx.config.digest_channel, since: since.toISOString() }),
]);
// a quiet day produces no message. reporting nothing is a first-class outcome.
if (orders.length === 0 && conversations.length === 0) {
return { reasoning: `no activity in window ${day}; no report sent`, actions: [] };
}
// the model writes prose over sensed data; it cannot add a recipient,
// change the tool, or send twice: those live in code, below.
const report = await ctx.reason({ day, orders, conversations, chatter });
return {
reasoning: `daily report for ${day}: ${orders.length} orders, ${conversations.length} conversations`,
actions: [{
connector: ctx.binding('notify.send'),
tool: 'notify.send',
args: { to: ctx.config.report_recipient, subject: `Operations report, ${day}`, body: report.body },
entity_key: `report:${day}`,
idempotency_key: `nightly-report:${day}`, // one report per day, no matter what
}],
};
},
});
The policy is one rule, and it is still worth writing, because the default-closed evaluation is what turns "the operator only sends reports" from a code-review observation into a runtime guarantee:
policy: nightly-report-guardrails
applies_to: nightly-report
rules:
- allow: notify.send
limit: { per: day, max: 2 } # the report, plus one retry's worth of headroom
default: deny
What to notice: the idempotency key is the date, not a random id. A crashed run that restarts, a replayed tick, a manual fibric operators run nightly-report --once at noon: all of them collapse into the one report that day already sent. If the send itself fails, the key was never marked applied and the retry goes through. This is the 657-message flood inverted into a one-line guarantee.
Running any of these
The loop is the same for all five projects, and it is the loop from local development:
# scaffold, then paste the example over src/
fibric init temp-watch --template operator
cd temp-watch && npm install
# terminal 1: the local kernel, side effects stubbed, policy loaded
fibric dev
# terminal 2: push envelopes through and read the dispositions
fibric dev replay fixtures/sample.jsonl
# the test that gates every publish: second pass must be all DEDUP
fibric dev replay fixtures/sample.jsonl --twice --strict
Side-effecting handlers are stubbed under fibric dev: validation runs, the policy evaluates, the disposition and receipt are real, and no thermostat moves. When stubs are no longer enough, attach a sandbox connection per pointing local code at a sandbox. From there, testing connectors turns the replay into CI, and publishing covers review and the marketplace.
Keep going
- Connector manifest: every field these examples set, and the ones they left at defaults.
- Emitting events: dedupe keys, entity references, and backfill for the two ingesting examples.
- Exposing actions: the full contract the order-hold tool implements.
- Defining operators: triggers, the reasoning contract, and the allowlist behind temp-watch and nightly-report.
- Local development: the dev server, tailing, and replay flags used throughout.