CoreAutomation & Ops
Automation & Monitoring
Wire pipelines, transaction queues, and telemetry exporters for ops teams.
Pipelines & jobs
import { AutomationPipeline, createBalanceSnapshotJob } from "@qubiq/core";
const pipeline = new AutomationPipeline();
pipeline.addTask({
name: "balances",
intervalMs: 60_000,
runOnStart: true,
job: createBalanceSnapshotJob({
identities: ["SUZ..."],
fetchBalance: (id) => client.getBalance(id),
onSnapshot: (snapshots) => console.log(snapshots),
}),
});
await pipeline.start();Add multiple tasks (proposal pollers, custom lambdas) and stop gracefully via pipeline.stop().
Transaction queue
const queue = new TransactionQueue({ wallet, client });
queue.addDispatchListener(({ item, attempt }) => console.log("dispatch", item, attempt));
queue.addRetryListener(({ item, attempt, error }) => console.warn("retry", attempt, error));
queue.enqueue({ destinationPublicKey: "cd".repeat(32), amount: BigInt(1_000_000) });The queue handles tick scheduling, signing, retries, and metadata emission.
Runtime profiles
const runtime = createAutomationRuntime("mainnet", {
onBalanceSnapshot: console.log,
onBalanceChange: ({ identity, current }) => console.log(identity, current.balance),
onTickSample: (sample) => metrics.recordTick(sample),
});
await runtime.start();Profiles bootstrap watchers, tick monitors, Prometheus server, and pipelines based on named environments (mainnet/testnet/custom).
Proposal polling
CCF fetching is now opt-in to avoid unnecessary network calls. Set enableProposals: true on your profile or provide onProposals when creating the runtime:
const runtime = createAutomationRuntime(
{
name: "ops",
liveServiceBaseUrl: "https://api.qubic.org",
balanceIdentities: [],
enableProposals: true,
},
{
onProposals: ({ activeIndices }) => console.log("active", activeIndices.length),
},
);Monitoring & telemetry
TickMonitor+BalanceMonitoremitsampleevents with deltas, epoch, ms timing.TelemetryMetricsRegistrytracks latest samples and request metrics.PrometheusMetricsServerexposes/metrics; combine with dashboards.instrumentRequestwraps async calls, recording duration + errors automatically.
Task config essentials
| Field | Description |
|---|---|
name | Unique label used in logs/metrics. |
intervalMs | How often the job runs. Combine with runOnStart for immediate kicks. |
job(context) | Async function that receives { signal, logger, metadata }. Respect signal.aborted for clean shutdowns. |
metadata | Free-form object passed to the job—use it for IDs, config, etc. |
Putting it together:
pipeline.addTask({
name: "snapshot:validators",
intervalMs: 30_000,
metadata: { identities: validators },
job: async ({ metadata, logger }) => {
const snapshots = await fetchBalances(metadata.identities);
logger.info("snapshot complete", { size: snapshots.length });
},
});Smoke tests
Two optional live tests hit public endpoints:
QUBIC_SMOKE_TESTS=true bun test tests/smoke/live-service-smoke.test.tsThey fetch tick info + a known balance to ensure connectivity. Keep them opt-in for CI stability.