QubicKit Docs
CoreAutomation & Ops

Automation & Monitoring

Wire pipelines, transaction queues, and telemetry exporters for ops teams.

Pipelines & jobs

import { AutomationPipeline, createBalanceSnapshotJob } from "@qubiq/core";

const pipeline = new AutomationPipeline();
pipeline.addTask({
  name: "balances",
  intervalMs: 60_000,
  runOnStart: true,
  job: createBalanceSnapshotJob({
    identities: ["SUZ..."],
    fetchBalance: (id) => client.getBalance(id),
    onSnapshot: (snapshots) => console.log(snapshots),
  }),
});
await pipeline.start();

Add multiple tasks (proposal pollers, custom lambdas) and stop gracefully via pipeline.stop().

Transaction queue

const queue = new TransactionQueue({ wallet, client });
queue.addDispatchListener(({ item, attempt }) => console.log("dispatch", item, attempt));
queue.addRetryListener(({ item, attempt, error }) => console.warn("retry", attempt, error));
queue.enqueue({ destinationPublicKey: "cd".repeat(32), amount: BigInt(1_000_000) });

The queue handles tick scheduling, signing, retries, and metadata emission.

Runtime profiles

const runtime = createAutomationRuntime("mainnet", {
  onBalanceSnapshot: console.log,
  onBalanceChange: ({ identity, current }) => console.log(identity, current.balance),
  onTickSample: (sample) => metrics.recordTick(sample),
});
await runtime.start();

Profiles bootstrap watchers, tick monitors, Prometheus server, and pipelines based on named environments (mainnet/testnet/custom).

Proposal polling

CCF fetching is now opt-in to avoid unnecessary network calls. Set enableProposals: true on your profile or provide onProposals when creating the runtime:

const runtime = createAutomationRuntime(
  {
    name: "ops",
    liveServiceBaseUrl: "https://api.qubic.org",
    balanceIdentities: [],
    enableProposals: true,
  },
  {
    onProposals: ({ activeIndices }) => console.log("active", activeIndices.length),
  },
);

Monitoring & telemetry

  • TickMonitor + BalanceMonitor emit sample events with deltas, epoch, ms timing.
  • TelemetryMetricsRegistry tracks latest samples and request metrics.
  • PrometheusMetricsServer exposes /metrics; combine with dashboards.
  • instrumentRequest wraps async calls, recording duration + errors automatically.

Task config essentials

FieldDescription
nameUnique label used in logs/metrics.
intervalMsHow often the job runs. Combine with runOnStart for immediate kicks.
job(context)Async function that receives { signal, logger, metadata }. Respect signal.aborted for clean shutdowns.
metadataFree-form object passed to the job—use it for IDs, config, etc.

Putting it together:

pipeline.addTask({
  name: "snapshot:validators",
  intervalMs: 30_000,
  metadata: { identities: validators },
  job: async ({ metadata, logger }) => {
    const snapshots = await fetchBalances(metadata.identities);
    logger.info("snapshot complete", { size: snapshots.length });
  },
});

Smoke tests

Two optional live tests hit public endpoints:

QUBIC_SMOKE_TESTS=true bun test tests/smoke/live-service-smoke.test.ts

They fetch tick info + a known balance to ensure connectivity. Keep them opt-in for CI stability.