Caching

Flowgen provides a cache for state management, deduplication, and leader election.

Backends

BackendDescription
NATS JetStream KVDistributed key-value store. Shared across all replicas.
In-memoryLocal HashMap. Single-node deployments. Lost on restart.

If NATS cache is enabled but fails to connect, flowgen falls back to in-memory automatically. If cache is disabled or not configured, in-memory is used.

Configuration

cache:
  enabled: true
  type: nats
  credentials_path: /etc/nats/credentials.json
  url: nats://localhost:4222
  db_name: flowgen_cache
  history: 10
  tombstone_ttl: "1h"
FieldTypeDefaultDescription
enabledboolrequiredEnable distributed cache.
typestringrequiredCache backend: nats.
credentials_pathstringrequiredPath to NATS credentials.
urlstringlocalhost:4222NATS server URL.
db_namestringflowgen_cacheKV bucket name.
historyint10Historical entries retained per key.
tombstone_ttlduration1hTTL for delete markers. Enables per-key TTL on entries.

Operations

The cache is exposed to Rhai scripts via ctx.cache. Keys are namespaced by the flow name, so two flows can use the same logical key without colliding.

OperationReturnsDescription
ctx.cache.get(key)() or stringRead a value. Returns () if not found.
ctx.cache.put(key, value)boolStore a value (string or integer) with no expiration.
ctx.cache.put(key, value, ttl_secs)boolStore with a time-to-live in seconds.
ctx.cache.delete(key)boolDelete a key.
ctx.cache.list_keys(prefix)array of stringsList keys matching a prefix (with the flow-name namespace stripped from results).

Access in scripts

// Write without expiration.
ctx.cache.put("order." + event.data.id, event.data.status);

// Write with TTL (seconds).
ctx.cache.put("seen." + event.data.id, "1", 3600);

// Read.
let status = ctx.cache.get("order." + event.data.id);

// Delete.
ctx.cache.delete("order." + event.data.id);

// List.
let keys = ctx.cache.list_keys("order.");
for key in keys {
    print(key);
}

Use cases

Deduplication:

let key = "seen." + event.data.id;
if ctx.cache.get(key) != () {
  return ();
}
ctx.cache.put(key, "1", 86400);
event

State tracking:

let key = "last_sync." + event.data.source;
let last = ctx.cache.get(key);
ctx.cache.put(key, timestamp_to_iso(timestamp_now()));
event.data.last_sync = last;
event

Leader election

The cache is also used for distributed leader election. When require_leader_election: true is set on a flow, flowgen uses the KV store to coordinate which replica runs the flow. Only one replica holds the lease at a time.