Kafka Integration

Kafka Integration

Overview

Kafka is a scalable messaging platform. This integration allows you to consume webhook events using a Kafka topic in addition to or instead of consuming the JSON events directly from a configured webhook, so events (for example, a “new user” event when a new user is created in FusionAuth) can be sent from FusionAuth to a Kafka topic.

When the Kafka integration is enabled, all webhook events across all tenants will be sent to Kafka with all fields included.

Configuration

The Kafka integration may be enabled using the Integrations API or through the FusionAuth UI by navigating to Settings -> Integrations -> Kafka.

Kafka Configuration

By default, you’ll see properties set for the Kafka Producer configuration, including bootstrap.servers, max.block.ms, and request.timeout.ms. These tell FusionAuth how to make the initial connection to your Kafka cluster and set how long it can wait to send information to Kafka. You can find a complete list of allowed configuration for this block at the Kafka configuration documentation.

Specify a topic that you’ve already created in your Kafka cluster and press “Send test event” to make sure that the connection is working as expected. After seeing that it succeeded, don’t forget to press “Save” in the top right to turn on the Kafka integration.

You should see an event similar to the following in your Kafka topic if the test succeeds.

{"createInstant":1667831017070,"id":"4532ba80-9443-4300-a324-3a2193e56c67","message":"You've successfully configured the Kafka Integration for FusionAuth.","type":"test"}

Example Configuration for Docker Compose

If you’re running Kafka alongside FusionAuth (for example, from the same docker-compose.yaml file), the only thing you need to change in the default configuration is to show FusionAuth where to find the Kafka bootstrap server on the network.

If your Docker Compose file is as follows:

Example docker-compose.yml with Kafka

version: '3'

services:
  db:
    image: postgres:16.0-bookworm
    environment:
      PGDATA: /var/lib/postgresql/data/pgdata
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    healthcheck:
      test: [ "CMD-SHELL", "pg_isready -U postgres" ]
      interval: 5s
      timeout: 5s
      retries: 5
    networks:
      - db_net
    restart: unless-stopped
    volumes:
      - db_data:/var/lib/postgresql/data

  search:
    image: opensearchproject/opensearch:2.11.0
    environment:
      cluster.name: fusionauth
      discovery.type: single-node
      node.name: search
      plugins.security.disabled: true
      bootstrap.memory_lock: true
      OPENSEARCH_JAVA_OPTS: ${OPENSEARCH_JAVA_OPTS}
    healthcheck:
      interval: 10s
      retries: 80
      test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:9200/
    restart: unless-stopped
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    ports:
      - 9200:9200 # REST API
      - 9600:9600 # Performance Analyzer
    volumes:
      - search_data:/usr/share/opensearch/data
    networks:
      - search_net

  fusionauth:
    image: fusionauth/fusionauth-app:latest
    depends_on:
      - db
      - search
      - kafka
      - zookeeper
    environment:
      DATABASE_URL: jdbc:postgresql://db:5432/fusionauth
      DATABASE_ROOT_USERNAME: ${POSTGRES_USER}
      DATABASE_ROOT_PASSWORD: ${POSTGRES_PASSWORD}
      DATABASE_USERNAME: ${DATABASE_USERNAME}
      DATABASE_PASSWORD: ${DATABASE_PASSWORD}
      FUSIONAUTH_APP_MEMORY: ${FUSIONAUTH_APP_MEMORY}
      FUSIONAUTH_APP_RUNTIME_MODE: ${FUSIONAUTH_APP_RUNTIME_MODE}
      FUSIONAUTH_APP_URL: http://fusionauth:9011
      SEARCH_SERVERS: http://search:9200
      SEARCH_TYPE: elasticsearch
      FUSIONAUTH_APP_KICKSTART_FILE: ${FUSIONAUTH_APP_KICKSTART_FILE}
    networks:
      - db_net
      - search_net
    restart: unless-stopped
    ports:
      - 9011:9011
    volumes:
      - fusionauth_config:/usr/local/fusionauth/config
      - ./kickstart:/usr/local/fusionauth/kickstart

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
    networks:
      - db_net

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      db_net:
        aliases:
          - kafka
 
networks:
  db_net:
    driver: bridge
  search_net:
    driver: bridge

volumes:
  db_data:
  fusionauth_config:
  search_data:

Then you would input the following configuration in the FusionAuth UI to configure the Kafka integration.

bootstrap.servers=kafka:9092
max.block.ms=5000
request.timeout.ms=2000

Example Configuration for a Remote Managed Kafka Integration

If you’re using a managed service for Kafka that runs on a different server than your FusionAuth installation, you’ll also need to specify credentials for connecting to the remote Kafka instance. You should be able to get the exact configuration you need from your Kafka hosting provider by looking for “Producer configuration” or similar. It should look similar to the following.

bootstrap.servers=pkc-6ojv2.us-west4.gcp.your-kafka-provider.cloud:9092
client.dns.lookup=use_all_dns_ips
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='ZN6LJ5UHSXZLW3LR'   password='M9T8b85OPspFAS37Do5Baq7jIS+hl7h7bY8MRrfVff5lz8xeCwea7zB5AC3nKXUD';
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
session.timeout.ms=45000

Event Types and Configuration

After successfully connecting to a Kafka instance, you’ll be notified of each event that happens in FusionAuth. Events will generally contain fields for:

  • id: A unique Id for that event that can be used for deduplication.
  • createInstant: A timestamp indicating when the event occurred.
  • type: The kind of event that occurred.
  • info: A map of extra information about the event and its source, such as IP address and device information.

Other fields applicable to the event may also be included. You can find the full schema for each event in the webhook events documentation.

Events can be categorized into two broad types:

  • System events, which include audit log events and event log data.
  • Tenant-based events, which include detailed information about user creation, removal, or changes.

By default, system events will be sent to Kafka without further configuration. Tenant events, however, are dependent on a Webhook being set up and active. The events sent to Kafka then follow the configuration of events for that webhook and tenant. If you don’t already have a webhook configured and want to use Kafka for tenant-level events, we recommend you set up a no-op webhook receiver that accepts the incoming POST request, discards it, and returns a 200 OK status. This will allow you to set up a dummy webhook configuration to control Kafka tenant-level events. To create such a receiver, you can use a low-code platform such as Pipedream or Zapier, or roll your own.

Example Events Sent to Kafka

After creating the integration and using FusionAuth, your Kafka topic might look similar to the following, which shows events for:

  1. Sending the initial test event.
  2. The audit log event for creating a new user with the email address newuser@example.com. This is a system-level event.
  3. The tenant-level event for creating the above user.
  4. An error event because the SMTP integration isn’t working so the password reset email couldn’t be sent to the new user. This is a system-level event.
{"createInstant":1667833973280,"id":"e6a4f780-02da-4b5a-8b04-94d2a49ea369","message":"You've successfully configured the Kafka Integration for FusionAuth.","type":"test"}

{"event":{"auditLog":{"id":38,"insertInstant":1667834917902,"insertUser":"test@example.com","message":"Created user with Id [3cbb85e7-ebf8-4c92-bc75-7ca8db4399db], name [null] and loginId [newuser@example.com]","reason":"FusionAuth User Interface"},"createInstant":1667834917903,"id":"4b81d279-24c7-463b-847a-0cecaaf113a0","info":{"ipAddress":"192.168.16.1","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"},"type":"audit-log.create"}}

{"event":{"createInstant":1667834917903,"id":"0c11627f-9461-4a00-8156-00ff6c3d68d3","info":{"ipAddress":"172.22.0.1","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"},"tenantId":"2671a63f-084c-4434-9465-fde65b8845ee","type":"user.create","user":{"active":true,"connectorId":"e3306678-a53a-4964-9040-1c96f36dda72","email":"newuser@example.com","id":"7e512f97-79f7-42e5-891f-a2383ed3460c","insertInstant":1667834917902,"lastLoginInstant":1667834917902,"lastUpdateInstant":1667834917902,"passwordChangeRequired":false,"passwordLastUpdateInstant":1667834917902,"tenantId":"2671a63f-084c-4434-9465-fde65b8845ee","twoFactor":{},"usernameStatus":"ACTIVE","verified":true}}}

{"event":{"createInstant":1667834917916,"eventLog":{"id":34,"insertInstant":1667834917913,"message":"Async Email Send exception occurred.\n\nTemplate Id: 3e6462be-178c-499f-92c9-3643ccca8ced\nTemplate Name: [FusionAuth Default] Setup Password\nTenant Id: 6825d48e-4df4-f83e-1055-f1d42e363749\nAddressed to: newuser@example.com\n\nCause:\ncom.sun.mail.util.MailConnectException : Message: Couldn't connect to host, port: localhost, 25; timeout -1","type":"Error"},"id":"44ca31e5-967c-4b5c-8ff4-1ee51d73999a","type":"event-log.create"}}

Troubleshooting FusionAuth’s Kafka Integration

Kafka is a powerful but complicated piece of software and you’ll need Kafka expertise to run a production Kafka setup to consume all of your FusionAuth events. If you get stuck integrating Kafka, try the following.

Troubleshooting the FusionAuth-Side of the Integration

  • Check that you included a topic when adding the integration.
  • Check that you saved the integration after configuring and testing it.
  • Check the FusionAuth logs for errors relating to Kafka.
  • If you’re using Docker Compose, check that you’ve correctly mapped the ports for both Kafka and Zookeeper and correctly configured a Docker network so that FusionAuth can connect to Kafka.

Troubleshooting your Kafka Installation

It can be useful to run quick commands directly against your Kafka cluster to create topics and log events. You can download a version of Kafka from the official Kafka website and extract it to your local machine. This will give you some utility scripts to directly interact with your Kafka cluster.

If you’re using a Docker Compose setup locally, add an entry to your hosts file to map kafka to 127.0.0.1.

127.0.0.1 kafka

You can then create a topic (for example, fa-events) that you can use in FusionAuth by running the following.

bin/kafka-topics.sh --create --topic fa-events --bootstrap-server kafka:9092

You can set a consumer to watch for new events with the following command. If everything is correctly set up, you’ll see events streamed to this consumer and logged to your shell as you carry out actions in FusionAuth.

bin/kafka-console-consumer.sh --topic fa-events --from-beginning --bootstrap-server kafka:9092

Or if you need to talk to a remote Kafka cluster, you can create a file locally called consumer.properties with credentials for your remote cluster.

bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
client.dns.lookup=use_all_dns_ips
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='ZN6LJ5UHSXWLW2LR'   password='M9T8b75OPspFAS37Do5Baq7jIS+hi7h7bY8MRrfVff5lz8xeCweaRTO8GD3nKXUD';
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
session.timeout.ms=45000

And then run the scripts above passing in the file with the --consumer.config flag and the remote bootstrap server. For example, use the following to see events posted to your fa-events topic in a remote cluster.

bin/kafka-console-consumer.sh --bootstrap-server pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 --consumer.config consumer.properties --topic fa-events