NATS JetStream Subscriber

Consumes messages from a NATS JetStream stream. Source task — typically first in a flow.

Configuration

- nats_jetstream_subscriber:
    name: order_events
    credentials_path: /etc/nats/credentials.json
    url: nats://localhost:4222
    subject: "orders.>"
    durable_name: order_processor
    stream:
      name: ORDERS
      subjects:
        - "orders.>"
      create_or_update: true

Fields

FieldTypeDefaultDescription
namestringrequiredTask name.
credentials_pathstringrequiredPath to NATS credentials file.
urlstringlocalhost:4222NATS server URL.
subjectstringrequiredSubject to subscribe to (supports wildcards).
durable_namestringDurable consumer name for persistent subscriptions.
streamobjectStream configuration (see below).
max_messagesintMax messages per batch fetch.
max_ack_pendingintMax unacknowledged messages.
max_deliverintMax delivery attempts before discarding.
delaydurationDelay between fetch requests.
throttledurationDelay between individual messages.
ack_timeoutdurationAcknowledgment timeout.
backofflistRedelivery backoff schedule (list of durations).
depends_onlistUpstream task names.
retryobjectRetry configuration.

Stream options

FieldTypeDefaultDescription
namestringrequiredStream name.
subjectslistrequiredSubject patterns (supports wildcards).
create_or_updateboolfalseCreate or update stream if it does not match.
max_agedurationMax message age.
max_messagesintMax messages in stream.
max_bytesintMax stream size in bytes.
retentionstringlimitsRetention policy: limits, interest, work_queue.
discardstringoldDiscard policy: old, new.
duplicate_windowdurationDeduplication window.

Example

flow:
  name: process_orders
  tasks:
    - nats_jetstream_subscriber:
        name: orders
        credentials_path: /etc/nats/credentials.json
        subject: "orders.created"
        durable_name: order_processor
        max_ack_pending: 100
        stream:
          name: ORDERS
          subjects:
            - "orders.>"
          create_or_update: true
          retention: work_queue

    - script:
        name: transform
        code: |
          let order = event.data;
          #{id: order.id, total: order.amount}

    - log:
        name: output