Fibric. Docs fibric.io →
v1.0.0 ยท stable
Build

Example projects

Five complete, runnable projects, each small enough to read in one sitting and each demonstrating one discipline the platform cares about: a read-only Slack connector that only senses, a bridge that turns arbitrary webhooks into governed envelopes, an MQTT operator that proves hardware and SaaS flow through identical machinery, an action tool that honors the idempotency contract in full, and a scheduled operator that reports without acting. Every example uses the real SDK surface from the Connector SDK and runs against the local kernel from local development.

How to read these

Each example is a complete project: one defineConnector() or defineOperator() def, a fixture file where replay is the point, and the commands to run it. The code is shown in full rather than in fragments, because the discipline these examples teach lives in the parts fragments usually omit: the dedupe keys, the preconditions, the honest error paths.

Conventions shared by all five:

ExampleKindThe discipline it demonstrates
Slack digestconnector, read-onlyIngesting a poll-based stream with stable dedupe keys and no write surface at all.
Webhook bridgeconnector, ingest-onlyVerifying, refusing, and mapping arbitrary POSTs onto the canonical envelope.
Temperature watchconnector + operatorHardware through the same machinery: sense MQTT, propose a setpoint, dispose under policy.
Order holdaction toolThe full idempotency contract: dedup, single-flight, and a converging handler.
Nightly reportoperator, scheduledA date-keyed side effect that structurally cannot send twice, and an empty plan on quiet days.

Read-only Slack digest connector

The smallest honest connector: it polls the Slack conversations API for new messages in the channels you configure, turns each one into an envelope, and exposes exactly one tool, a read. There is no write path at all, which means no policy question ever arises; an operator downstream can summarize, count, or correlate, and anything it wants to do about a message must go through some other connector's governed action.

src/index.ts — cn-slack-digest
import { defineConnector, tool, oauth2, type ConnectorCtx } from '@fibric/connector-sdk';

interface SlackMessage {
  ts: string;          // Slack's message timestamp doubles as its id
  channel: string;
  user: string;
  text: string;
  thread_ts?: string;
}

async function fetchNewMessages(ctx: ConnectorCtx): Promise<SlackMessage[]> {
  const channels = (ctx.config.channels as string[]) ?? [];
  const out: SlackMessage[] = [];
  for (const channel of channels) {
    // in prod, ctx.http is per-tenant, rate-limited, retrying, creds injected
    const res = await slackApi(ctx, 'conversations.history', {
      channel,
      oldest: String(ctx.config.cursor ?? 0),
      limit: 200,
    });
    for (const m of res.messages) out.push({ ...m, channel });
  }
  return out;
}

export default defineConnector({
  id: 'cn-slack-digest',
  version: '1.0.0',
  category: 'comms',
  publisher: 'private',

  auth: oauth2({ scopes: ['channels:history', 'channels:read'] }),

  tools: {
    // the one read an operator can sense through. no sideEffecting flag:
    // reads need no policy and no idempotency.
    'message.read': tool({
      input: (args) => {
        const a = args as Record<string, unknown>;
        if (typeof a.channel !== 'string') throw new Error('channel is required');
        return a;
      },
      handler: async (ctx, args) =>
        slackApi(ctx, 'conversations.history', { channel: args.channel, limit: 50 }),
    }),
  },

  events: {
    // the digest stream: each new message becomes one envelope
    'message.posted': { kind: 'poll', topic: 'conversations.history' },
  },

  probe: (ctx) => ({
    status: 'ok',
    metric: { label: 'channels watched', value: (ctx.config.channels as string[])?.length ?? 0 },
  }),
});

The poll handler maps each message onto the envelope fields documented in emitting events. Slack's ts is the natural dedupe key: it is unique per channel and stable across polls, so a poll window that overlaps the previous one re-emits nothing.

the poll → envelope mapping
export async function poll(ctx: ConnectorCtx, emit: (e: EmitInput) => void) {
  for (const m of await fetchNewMessages(ctx)) {
    emit({
      event_type: 'message.posted',
      dedupe_key: `slack:${m.channel}:${m.ts}`,   // stable across overlapping polls
      payload: {
        entity: { kind: 'channel', id: m.channel },
        user: m.user,
        text: m.text,
        thread_ts: m.thread_ts ?? null,
      },
    });
  }
}

What to notice: the connector never summarizes. Digesting is reasoning, and reasoning belongs to an operator; the connector's whole job is to make the raw signal arrive as well-formed, deduplicated envelopes. The nightly report operator below is a natural consumer.

Webhook-to-envelope bridge

Plenty of systems can POST JSON somewhere but will never have a first-class connector. The bridge accepts any webhook, validates the little it can, and maps it onto the canonical EventEnvelope shape so the rest of the platform never knows the source was ad hoc. This is the pattern behind every kind: 'webhook' event stream; here it is isolated so you can see all of it.

src/index.ts — cn-webhook-bridge
import { defineConnector, apiKey } from '@fibric/connector-sdk';
import { createHmac, timingSafeEqual } from 'node:crypto';

export default defineConnector({
  id: 'cn-webhook-bridge',
  version: '1.0.0',
  category: 'data',
  publisher: 'private',

  // the sender authenticates to US with a shared secret; see connector auth patterns
  auth: apiKey(),

  tools: {},   // nothing to read back, nothing to act on: this connector only ingests

  events: {
    'bridge.received': { kind: 'webhook' },
  },
});

The webhook handler does three jobs, in order: verify the sender, refuse what it cannot map, and emit an envelope whose fields mean what the platform says they mean. The kernel stamps tenant_id, reseller_id, and event_id; the bridge's responsibility is event_type, correlation_id, the entity reference, and the dedupe key.

the webhook handler
interface BridgePayload {
  type: string;                       // becomes event_type, namespaced below
  id: string;                         // the sender's id for this occurrence
  entity?: { kind: string; id: string };
  data?: Record<string, unknown>;
  correlation?: string;
}

export async function onWebhook(ctx: ConnectorCtx, req: WebhookRequest, emit: (e: EmitInput) => void) {
  // 1. verify: HMAC over the raw body with the shared secret
  const mac = createHmac('sha256', ctx.secret('signing_key')).update(req.rawBody).digest();
  const given = Buffer.from(req.headers['x-bridge-signature'] ?? '', 'hex');
  if (given.length !== mac.length || !timingSafeEqual(mac, given)) {
    return req.reject(401, 'bad signature');
  }

  // 2. refuse what cannot be mapped; a webhook with no type or id is noise, not signal
  const body = req.json as BridgePayload;
  if (!body?.type || !body?.id) return req.reject(422, 'type and id are required');
  if (!/^[a-z_]+(\.[a-z_]+)+$/.test(body.type)) {
    return req.reject(422, 'type must be noun.verb, e.g. invoice.paid');
  }

  // 3. emit: one occurrence, one envelope, deduplicated on the sender's id
  emit({
    event_type: `bridge.${body.type}`,
    dedupe_key: `bridge:${body.type}:${body.id}`,   // sender retries collapse here
    correlation_id: body.correlation,               // optional; kernel generates if absent
    payload: {
      entity: body.entity ?? null,
      ...body.data,
    },
  });
  req.accept(202);
}

Point anything at it and watch the stream normalize:

terminal
$ fibric dev            # webhooks on :4310
$ curl -s -X POST http://localhost:4310/webhooks/cn-webhook-bridge \
    -H "x-bridge-signature: $(sign payload.json)" \
    -d '{"type":"invoice.paid","id":"inv_2214","entity":{"kind":"invoice","id":"inv_2214"},"data":{"amount":420.00}}'

$ fibric events tail --source cn-webhook-bridge
  11:02:44  bridge.invoice.paid   src=cn-webhook-bridge   corr=co-77b1

What to notice: the dedupe_key is built from the sender's own id, so a sender that retries its POST (they all do) produces exactly one envelope. Accepting with 202 before any downstream work is the correct contract: the bridge's promise is durable receipt, not synchronous processing.

Temperature watch on MQTT

The example that makes the platform's claim concrete: a thermostat reading and a support message flow through identical machinery. A hardware connector subscribes to an MQTT topic and emits one envelope per reading; an operator subscribes to those envelopes and proposes a setpoint change when a zone drifts. The executor disposes the change under policy, exactly as it would dispose an order hold.

First, the connector. category: 'hardware' is metadata, not a different code path:

src/index.ts — cn-mqtt-hvac
import { defineConnector, tool, none } from '@fibric/connector-sdk';

export default defineConnector({
  id: 'cn-mqtt-hvac',
  version: '1.0.0',
  category: 'hardware',
  publisher: 'private',

  // the broker is on the private network; transport security lives below this layer
  auth: none(),

  tools: {
    'zone.read': tool({
      handler: async (ctx, args) => readZoneState(ctx, args.zone_id as string),
    }),

    // the ONE write: bounded, side-effecting, therefore governed
    'zone.setpoint': tool({
      sideEffecting: true,
      input: (args) => {
        const a = args as Record<string, unknown>;
        const target = Number(a.target_c);
        if (typeof a.zone_id !== 'string') throw new Error('zone_id is required');
        if (!Number.isFinite(target) || target < 16 || target > 26) {
          throw new Error('target_c must be within 16..26');   // bound in code AND in policy
        }
        return { zone_id: a.zone_id, target_c: target };
      },
      handler: async (ctx, args) =>
        publishSetpoint(ctx, args.zone_id as string, args.target_c as number),
    }),
  },

  events: {
    // one envelope per reading, from the broker subscription
    'zone.temperature': { kind: 'webhook', topic: 'site/+/zone/+/temp' },
  },

  probe: (ctx) => ({
    status: 'ok',
    metric: { label: 'zones reporting (5m)', value: zonesSeenRecently(ctx) },
  }),
});

Then the operator. It watches the stream, ignores jitter, and proposes only when a zone is genuinely out of band, with both keys built deterministically per zone:

src/operator.ts — temp-watch
import { defineOperator } from '@fibric/sdk';

export default defineOperator({
  name: 'temp-watch',
  goal: 'Keep every zone within its comfort band, and page a human before acting on server rooms.',
  requires: ['zone.read', 'zone.setpoint', 'notify.send'],
  model: 'router:default',
  trigger: { on: 'zone.temperature' },

  async run(ctx) {
    const zoneId = ctx.event.payload.entity?.id as string;

    // sense current state, not the triggering reading: sensors repeat and drift
    const zone = await ctx.sense('zone.read', { zone_id: zoneId });
    const drift = zone.temp_c - zone.setpoint_c;

    if (Math.abs(drift) < 1.5) {
      return { reasoning: `zone ${zoneId} within band (drift ${drift.toFixed(1)}C)`, actions: [] };
    }

    const verdict = await ctx.reason({ zone, drift });   // model decides what; code decides keys

    return {
      reasoning: verdict.summary,
      actions: [{
        connector: ctx.binding('zone.setpoint'),
        tool: 'zone.setpoint',
        args: { zone_id: zoneId, target_c: verdict.target_c },
        entity_key: `zone:${zoneId}`,                     // one change in flight per zone
        idempotency_key: `temp-watch:${zoneId}:${verdict.target_c}`,
      }],
    };
  },
});

The policy is where the physical stakes get their teeth. Ordinary zones adjust freely inside a rate limit; server-room zones alert a human first:

policy/trust.yaml
policy: temp-watch-guardrails
applies_to: temp-watch

rules:
  - allow: zone.setpoint
    limit: { per: hour, max: 12 }        # a flapping sensor cannot thrash the plant
    single_flight: zone_id

  - require_confirmation: zone.setpoint
    when: { zone_class: server_room }    # people first where the blast radius is real

default: deny

Replay a drifting zone against it. The fixture is two readings for the same zone; the second must dedupe if the model recommends the same target:

fixtures/drift.jsonl
{"source":"cn-mqtt-hvac","event_type":"zone.temperature","payload":{"entity":{"kind":"zone","id":"12"},"temp_c":24.9,"setpoint_c":21.0}}
{"source":"cn-mqtt-hvac","event_type":"zone.temperature","payload":{"entity":{"kind":"zone","id":"12"},"temp_c":25.1,"setpoint_c":21.0}}
terminal
$ fibric dev replay fixtures/drift.jsonl
  replayed 2 envelopes
  routed   2 -> temp-watch
  disposed zone.setpoint zone:12  ALLOW   zone.setpoint zone:12  DEDUP
           # same recommended target => same idempotency key => one write

What to notice: the idempotency key includes the target value, so a repeated reading that produces the same recommendation collapses to DEDUP, while a genuinely new target is a genuinely new action. And the empty plan is doing real work; most readings should end in { actions: [] }.

Order-hold tool with idempotency

The full idempotency contract from exposing actions, honored end to end in one tool. The executor already deduplicates on idempotency_key before your handler runs; the handler's remaining obligation is to be safe against the retries the executor itself cannot see, by converging on the target state instead of blindly transitioning.

src/index.ts — the order.hold tool
import { tool, type ConnectorCtx } from '@fibric/connector-sdk';
import { z } from 'zod';

const HoldArgs = z.object({
  order_id: z.string().regex(/^SO-\d+$/),
  reason: z.enum(['promise_risk', 'payment_review', 'address_mismatch']),
  note: z.string().max(500).optional(),
}).strict();

class PreconditionFailed extends Error {}

export const orderHold = tool({
  sideEffecting: true,                  // routes through the executor + trust policy

  input: (args) => HoldArgs.parse(args),

  handler: async (ctx: ConnectorCtx, args) => {
    const order = await getOrder(ctx, args.order_id as string);

    // converge, don't transition: already held is success, not conflict
    if (order.status === 'holded') {
      ctx.log('hold no-op, already held', { order: args.order_id });
      return { held: true, changed: false, status: order.status };
    }

    // the world moved between proposing and disposing: fail with a stable reason
    if (order.status === 'complete' || order.status === 'canceled') {
      throw new PreconditionFailed(`order ${args.order_id} is ${order.status}; cannot hold`);
    }

    // one honest call, reported honestly
    const updated = await putOrderStatus(ctx, args.order_id as string, {
      status: 'holded',
      comment: `[fibric] ${args.reason}${args.note ? `: ${args.note}` : ''}`,
    });
    return { held: true, changed: true, status: updated.status };
  },
});

The layers, and which retry each one absorbs:

LayerAbsorbsWhere
idempotency_key dedupThe same plan action proposed or replayed twice. Second disposition is DEDUP; the handler never runs.Executor, before the handler.
entity_key single-flightTwo different actions racing on one order. They serialize; the second sees the first's outcome in its precondition read.Executor, around the handler.
Converging handlerA retry after a timeout where the first attempt actually landed. The re-read sees holded and succeeds without a second write.Your code, above.

Prove all three with the two-pass replay:

terminal
$ fibric dev replay fixtures/hold.jsonl --twice --strict
  pass 1: order.hold SO-11290  ALLOW  ok   key=ship-risk:SO-11290:hold
  pass 2: order.hold SO-11290  DEDUP  ok   key=ship-risk:SO-11290:hold
  strict: 0 blocks, 0 errors

What to notice: changed: false in the no-op return. The receipt records whether the world moved, not only whether the call succeeded, and an audit that can distinguish "held it" from "found it already held" is worth the one extra field.

Nightly report operator

A scheduled operator whose only side effect is a message. It wakes once a night, senses the day through whatever read capabilities the tenant bound, asks the model for a short operational summary, and proposes a single notify.send. It composes naturally with the Slack digest connector: bind message.read and the day's channel traffic becomes part of the picture it reports on.

src/operator.ts — nightly-report
import { defineOperator } from '@fibric/sdk';

export default defineOperator({
  name: 'nightly-report',
  goal: 'Every morning, the team reads one honest summary of yesterday: what moved, what stalled, what needs a decision.',
  requires: ['orders.read', 'conversations.read', 'message.read', 'notify.send'],
  model: 'router:default',
  trigger: { every: '24h' },              // the tick arrives as an envelope, source: 'cron'

  async run(ctx) {
    // the reporting window is derived from the tick, deterministically
    const until = new Date(ctx.event.payload.tick_at as string);
    const since = new Date(until.getTime() - 24 * 3600 * 1000);
    const day = since.toISOString().slice(0, 10);

    const [orders, conversations, chatter] = await Promise.all([
      ctx.sense('orders.read', { updated_since: since.toISOString() }),
      ctx.sense('conversations.read', { updated_since: since.toISOString() }),
      ctx.sense('message.read', { channel: ctx.config.digest_channel, since: since.toISOString() }),
    ]);

    // a quiet day produces no message. reporting nothing is a first-class outcome.
    if (orders.length === 0 && conversations.length === 0) {
      return { reasoning: `no activity in window ${day}; no report sent`, actions: [] };
    }

    // the model writes prose over sensed data; it cannot add a recipient,
    // change the tool, or send twice: those live in code, below.
    const report = await ctx.reason({ day, orders, conversations, chatter });

    return {
      reasoning: `daily report for ${day}: ${orders.length} orders, ${conversations.length} conversations`,
      actions: [{
        connector: ctx.binding('notify.send'),
        tool: 'notify.send',
        args: { to: ctx.config.report_recipient, subject: `Operations report, ${day}`, body: report.body },
        entity_key: `report:${day}`,
        idempotency_key: `nightly-report:${day}`,   // one report per day, no matter what
      }],
    };
  },
});

The policy is one rule, and it is still worth writing, because the default-closed evaluation is what turns "the operator only sends reports" from a code-review observation into a runtime guarantee:

policy/trust.yaml
policy: nightly-report-guardrails
applies_to: nightly-report

rules:
  - allow: notify.send
    limit: { per: day, max: 2 }     # the report, plus one retry's worth of headroom

default: deny

What to notice: the idempotency key is the date, not a random id. A crashed run that restarts, a replayed tick, a manual fibric operators run nightly-report --once at noon: all of them collapse into the one report that day already sent. If the send itself fails, the key was never marked applied and the retry goes through. This is the 657-message flood inverted into a one-line guarantee.

Running any of these

The loop is the same for all five projects, and it is the loop from local development:

terminal
# scaffold, then paste the example over src/
fibric init temp-watch --template operator
cd temp-watch && npm install

# terminal 1: the local kernel, side effects stubbed, policy loaded
fibric dev

# terminal 2: push envelopes through and read the dispositions
fibric dev replay fixtures/sample.jsonl

# the test that gates every publish: second pass must be all DEDUP
fibric dev replay fixtures/sample.jsonl --twice --strict

Side-effecting handlers are stubbed under fibric dev: validation runs, the policy evaluates, the disposition and receipt are real, and no thermostat moves. When stubs are no longer enough, attach a sandbox connection per pointing local code at a sandbox. From there, testing connectors turns the replay into CI, and publishing covers review and the marketplace.

Keep going