FusionAuth developer image
FusionAuth developer logo
  • Back to site
  • Expert Advice
  • Blog
  • Developers
  • Downloads
  • Account
  • Contact sales
Navigate to...
  • Welcome
  • Getting Started
    • Getting Started
    • 5-minute Setup Guide
      • Overview
      • Docker
      • Fast Path
      • Sandbox
    • Setup Wizard & First Login
    • Register a User and Login
    • Self-service Registration
    • Start and Stop FusionAuth
    • Core Concepts
      • Overview
      • Users
      • Roles
      • Groups
      • Registrations
      • Applications
      • Tenants
      • Identity Providers
      • Authentication/Authorization
      • Integration Points
    • Example Apps
      • Overview
      • Dart
      • Go
      • Java
      • JavaScript
      • .NET Core
      • PHP
      • Python
      • Ruby
    • Tutorials
  • Installation Guide
    • Overview
    • System Requirements
    • Server Layout
    • Cloud
    • Cluster
    • Docker
    • Fast Path
    • Kubernetes
      • Overview
      • Deployment Guide
      • Minikube Setup
      • Amazon EKS Setup
      • Google GKE Setup
      • Microsoft AKS Setup
    • Kickstart™
    • Homebrew
    • Marketplaces
    • Packages
    • Database
    • FusionAuth App
    • FusionAuth Search
    • Common Configuration
  • Migration Guide
    • Overview
    • General
    • Auth0
    • Keycloak
    • Amazon Cognito
    • Firebase
    • Microsoft Azure AD B2C
    • Tutorial
  • Admin Guide
    • Overview
    • Account Portal
    • Config Management
    • Editions and Features
    • Key Rotation
    • Licensing
    • Monitoring
    • Prometheus Setup
    • Proxy Setup
    • Reference
      • Overview
      • Configuration
      • CORS
      • Data Types
      • Hosted Login Pages Cookies
      • Known Limitations
      • Password Hashes
    • Releases
    • Roadmap
    • Search And FusionAuth
    • Securing
    • Switch Search Engines
    • Technical Support
    • Troubleshooting
    • Upgrading
    • WebAuthn
  • Login Methods
    • Identity Providers
      • Overview
      • Apple
      • Epic Games
      • External JWT
        • Overview
        • Example
      • Facebook
      • Google
      • HYPR
      • LinkedIn
      • Nintendo
      • OpenID Connect
        • Overview
        • Amazon Cognito
        • Azure AD
        • Discord
        • Github
      • Sony PlayStation Network
      • Steam
      • Twitch
      • Twitter
      • SAML v2
        • Overview
        • ADFS
        • Azure AD
      • SAML v2 IdP Initiated
        • Overview
        • Okta
      • Xbox
    • OIDC & OAuth 2.0
      • Overview
      • Endpoints
      • Tokens
      • OAuth Modes
    • Passwordless
      • Overview
      • Magic Links
      • WebAuthn & Passkeys
    • SAML v2 IdP
      • Overview
      • Google
      • Zendesk
  • Developer Guide
    • Overview
    • API Gateways
      • Overview
      • ngrok Cloud Edge
    • Client Libraries & SDKs
      • Overview
      • Dart
      • Go
      • Java
      • JavaScript
      • .NET Core
      • Node
      • OpenAPI
      • PHP
      • Python
      • React
      • Ruby
      • Typescript
    • Events & Webhooks
      • Overview
      • Writing a Webhook
      • Securing Webhooks
      • Events
        • Overview
        • Audit Log Create
        • Event Log Create
        • JWT Public Key Update
        • JWT Refresh
        • JWT Refresh Token Revoke
        • Kickstart Success
        • Group Create
        • Group Create Complete
        • Group Delete
        • Group Delete Complete
        • Group Update
        • Group Update Complete
        • Group Member Add
        • Group Member Add Complete
        • Group Member Remove
        • Group Member Remove Complete
        • Group Member Update
        • Group Member Update Complete
        • User Action
        • User Bulk Create
        • User Create
        • User Create Complete
        • User Deactivate
        • User Delete
        • User Delete Complete
        • User Email Update
        • User Email Verified
        • User IdP Link
        • User IdP Unlink
        • User Login Failed
        • User Login Id Dup. Create
        • User Login Id Dup. Update
        • User Login New Device
        • User Login Success
        • User Login Suspicious
        • User Password Breach
        • User Password Reset Send
        • User Password Reset Start
        • User Password Reset Success
        • User Password Update
        • User Reactivate
        • User Reg. Create
        • User Reg. Create Complete
        • User Reg. Delete
        • User Reg. Delete Complete
        • User Registration Update
        • User Reg. Update Complete
        • User Reg. Verified
        • User 2FA Method Add
        • User 2FA Method Remove
        • User Update
        • User Update Complete
    • Guides
      • Overview
      • Authentication Tokens
      • Exposing A Local Instance
      • JSON Web Tokens
      • Key Master
      • Localization and Internationalization
      • Multi-Factor Authentication
      • Multi-Tenant
      • Passwordless
      • Registration-based Email Verification
      • Searching With Elasticsearch
      • Securing Your APIs
      • Silent Mode
      • Single Sign-on
      • Two Factor (pre 1.26)
    • Integrations
      • Overview
      • CleanSpeak
      • Kafka
      • Twilio
    • Plugins
      • Overview
      • Writing a Plugin
      • Custom Password Hashing
    • User Control & Gating
      • Overview
      • Gate Unverified Users
      • Gate Unverified Registrations
      • User Account Lockout
  • Customization
    • Email & Templates
      • Overview
      • Configure Email
      • Email Templates
      • Email Variables
      • Message Templates
    • Lambdas
      • Overview
      • Apple Reconcile
      • Client Cred. JWT Populate
      • Epic Games Reconcile
      • External JWT Reconcile
      • Facebook Reconcile
      • Google Reconcile
      • HYPR Reconcile
      • JWT Populate
      • LDAP Connector Reconcile
      • LinkedIn Reconcile
      • Nintendo Reconcile
      • OpenID Connect Reconcile
      • SAML v2 Populate
      • SAML v2 Reconcile
      • SCIM Group Req. Converter
      • SCIM Group Resp. Convtr.
      • SCIM User Req. Converter
      • SCIM User Resp. Converter
      • Sony PSN Reconcile
      • Steam Reconcile
      • Twitch Reconcile
      • Twitter Reconcile
      • Xbox Reconcile
    • Messengers
      • Overview
      • Generic Messenger
      • Twilio Messenger
    • Themes
      • Overview
      • Examples
      • Helpers
      • Localization
      • Template Variables
  • Premium Features
    • Overview
    • Advanced Registration Forms
    • Advanced Threat Detection
    • Application Specific Themes
    • Breached Password Detection
    • Connectors
      • Overview
      • Generic Connector
      • LDAP Connector
      • FusionAuth Connector
    • Entity Management
    • SCIM
      • Overview
      • Azure AD Client
      • Okta Client
      • SCIM-SDK
    • Self Service Account Mgmt
      • Overview
      • Updating User Data & Password
      • Add Two-Factor Authenticator
      • Add Two-Factor Email
      • Add Two-Factor SMS
      • Add WebAuthn Passkey
      • Customizing
      • Troubleshooting
    • WebAuthn
  • APIs
    • Overview
    • Authentication
    • Errors
    • API Explorer
    • Actioning Users
    • API Keys
    • Applications
    • Audit Logs
    • Connectors
      • Overview
      • Generic
      • LDAP
    • Consents
    • Emails
    • Entity Management
      • Overview
      • Entities
      • Entity Types
      • Grants
    • Event Logs
    • Families
    • Forms
    • Form Fields
    • Groups
    • Identity Providers
      • Overview
      • Links
      • Apple
      • External JWT
      • Epic Games
      • Facebook
      • Google
      • HYPR
      • LinkedIn
      • Nintendo
      • OpenID Connect
      • SAML v2
      • SAML v2 IdP Initiated
      • Sony PlayStation Network
      • Steam
      • Twitch
      • Twitter
      • Xbox
    • Integrations
    • IP Access Control Lists
    • JWT
    • Keys
    • Lambdas
    • Login
    • Message Templates
    • Messengers
      • Overview
      • Generic
      • Twilio
    • Multi-Factor/Two Factor
    • Passwordless
    • Reactor
    • Registrations
    • Reports
    • SCIM
      • Overview
      • SCIM User
      • SCIM Group
      • SCIM EnterpriseUser
      • SCIM Service Provider Config.
    • System
    • Tenants
    • Themes
    • Users
    • User Actions
    • User Action Reasons
    • User Comments
    • WebAuthn
    • Webhooks
  • Release Notes

    Kafka Integration

    Overview

    Kafka is a scalable messaging platform. This integration allows you to consume webhook events using a Kafka topic in addition to or instead of consuming the JSON events directly from a configured webhook, so events (for example, a "new user" event when a new user is created in FusionAuth) can be sent from FusionAuth to a Kafka topic.

    When the Kafka integration is enabled, all webhook events across all tenants will be sent to Kafka with all fields included.

    Configuration

    The Kafka integration may be enabled using the Integrations API or through the FusionAuth UI by navigating to Settings → Integrations → Kafka.

    Kafka Configuration

    By default, you’ll see properties set for the Kafka Producer configuration, including bootstrap.servers, max.block.ms, and request.timeout.ms. These tell FusionAuth how to make the initial connection to your Kafka cluster and set how long it can wait to send information to Kafka. You can find a complete list of allowed configuration for this block at the Kafka configuration documentation.

    Specify a topic that you’ve already created in your Kafka cluster and press "Send test event" to make sure that the connection is working as expected. After seeing that it succeeded, don’t forget to press "Save" in the top right to turn on the Kafka integration.

    You should see an event similar to the following in your Kafka topic if the test succeeds.

    
    {"createInstant":1667831017070,"id":"4532ba80-9443-4300-a324-3a2193e56c67","message":"You've successfully configured the Kafka Integration for FusionAuth.","type":"test"}

    Example Configuration for Docker Compose

    If you’re running Kafka alongside FusionAuth (for example, from the same docker-compose.yaml file), the only thing you need to change in the default configuration is to show FusionAuth where to find the Kafka bootstrap server on the network.

    If your Docker Compose file is as follows:

    
    version: '3'
    
    services:
      db:
        image: postgres:12.9
        environment:
          PGDATA: /var/lib/postgresql/data/pgdata
          POSTGRES_USER: ${POSTGRES_USER}
          POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
        healthcheck:
          test: [ "CMD-SHELL", "pg_isready -U postgres" ]
          interval: 5s
          timeout: 5s
          retries: 5
        networks:
          - db_net
        restart: unless-stopped
        volumes:
          - db_data:/var/lib/postgresql/data
    
      search:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
        container_name: search
        environment:
          cluster.name: fusionauth
          bootstrap.memory_lock: "true"
          discovery.type: single-node
          ES_JAVA_OPTS: ${ES_JAVA_OPTS}
        healthcheck:
          test: [ "CMD", "curl",  "--fail" ,"--write-out", "'HTTP %{http_code}'", "--silent", "--output", "/dev/null", "http://localhost:9200/" ]
          interval: 5s
          timeout: 5s
          retries: 5
        networks:
          - search_net
        restart: unless-stopped
        ulimits:
          memlock:
            soft: -1
            hard: -1
        volumes:
          - search_data:/usr/share/elasticsearch/data
    
      fusionauth:
        image: fusionauth/fusionauth-app:latest
        depends_on:
          - db
          - search
          - kafka
          - zookeeper
        environment:
          DATABASE_URL: jdbc:postgresql://db:5432/fusionauth
          DATABASE_ROOT_USERNAME: ${POSTGRES_USER}
          DATABASE_ROOT_PASSWORD: ${POSTGRES_PASSWORD}
          DATABASE_USERNAME: ${DATABASE_USERNAME}
          DATABASE_PASSWORD: ${DATABASE_PASSWORD}
          FUSIONAUTH_APP_MEMORY: ${FUSIONAUTH_APP_MEMORY}
          FUSIONAUTH_APP_RUNTIME_MODE: development
          FUSIONAUTH_APP_URL: http://fusionauth:9011
          SEARCH_SERVERS: http://search:9200
          SEARCH_TYPE: elasticsearch
    
        networks:
          - db_net
          - search_net
        restart: unless-stopped
        ports:
          - 9011:9011
        volumes:
          - fusionauth_config:/usr/local/fusionauth/config
    
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
        ports:
          - 2181:2181
        networks:
          - db_net
    
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 9092:9092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        networks:
          db_net:
            aliases:
              - kafka
    
    networks:
      db_net:
        driver: bridge
      search_net:
        driver: bridge
    
    volumes:
      db_data:
      fusionauth_config:
      search_data:

    Then you would input the following configuration in the FusionAuth UI to configure the Kafka integration.

    
    bootstrap.servers=kafka:9092
    max.block.ms=5000
    request.timeout.ms=2000

    Example Configuration for a Remote Managed Kafka Integration

    If you’re using a managed service for Kafka that runs on a different server than your FusionAuth installation, you’ll also need to specify credentials for connecting to the remote Kafka instance. You should be able to get the exact configuration you need from your Kafka hosting provider by looking for "Producer configuration" or similar. It should look similar to the following.

    
    bootstrap.servers=pkc-6ojv2.us-west4.gcp.your-kafka-provider.cloud:9092
    client.dns.lookup=use_all_dns_ips
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='ZN6LJ5UHSXZLW3LR'   password='M9T8b85OPspFAS37Do5Baq7jIS+hl7h7bY8MRrfVff5lz8xeCwea7zB5AC3nKXUD';
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    session.timeout.ms=45000

    Event Types and Configuration

    After successfully connecting to a Kafka instance, you’ll be notified of each event that happens in FusionAuth. Events will generally contain fields for:

    • id: A unique Id for that event that can be used for deduplication.

    • createInstant: A timestamp indicating when the event occurred.

    • type: The kind of event that occurred.

    • info: A map of extra information about the event and its source, such as IP address and device information.

    Other fields applicable to the event may also be included. You can find the full schema for each event in the webhook events documentation.

    Events can be categorized into two broad types:

    • System events, which include audit log events and event log data.

    • Tenant-based events, which include detailed information about user creation, removal, or changes.

    By default, system events will be sent to Kafka without further configuration. Tenant events, however, are dependent on a Webhook being set up and active. The events sent to Kafka then follow the configuration of events for that webhook and tenant. If you don’t already have a webhook configured and want to use Kafka for tenant-level events, we recommend you set up a no-op webhook receiver that accepts the incoming POST request, discards it, and returns a 200 OK status. This will allow you to set up a dummy webhook configuration to control Kafka tenant-level events. To create such a receiver, you can use a low-code platform such as Pipedream or Zapier, or roll your own.

    Example Events Sent to Kafka

    After creating the integration and using FusionAuth, your Kafka topic might look similar to the following, which shows events for:

    1. Sending the initial test event.

    2. The audit log event for creating a new user with the email address newuser@example.com. This is a system-level event.

    3. The tenant-level event for creating the above user.

    4. An error event because the SMTP integration isn’t working so the password reset email couldn’t be sent to the new user. This is a system-level event.

    
    {"createInstant":1667833973280,"id":"e6a4f780-02da-4b5a-8b04-94d2a49ea369","message":"You've successfully configured the Kafka Integration for FusionAuth.","type":"test"}
    
    {"event":{"auditLog":{"id":38,"insertInstant":1667834917902,"insertUser":"test@example.com","message":"Created user with Id [3cbb85e7-ebf8-4c92-bc75-7ca8db4399db], name [null] and loginId [newuser@example.com]","reason":"FusionAuth User Interface"},"createInstant":1667834917903,"id":"4b81d279-24c7-463b-847a-0cecaaf113a0","info":{"ipAddress":"192.168.16.1","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"},"type":"audit-log.create"}}
    
    {"event":{"createInstant":1667834917903,"id":"0c11627f-9461-4a00-8156-00ff6c3d68d3","info":{"ipAddress":"172.22.0.1","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"},"tenantId":"2671a63f-084c-4434-9465-fde65b8845ee","type":"user.create","user":{"active":true,"connectorId":"e3306678-a53a-4964-9040-1c96f36dda72","email":"newuser@example.com","id":"7e512f97-79f7-42e5-891f-a2383ed3460c","insertInstant":1667834917902,"lastLoginInstant":1667834917902,"lastUpdateInstant":1667834917902,"passwordChangeRequired":false,"passwordLastUpdateInstant":1667834917902,"tenantId":"2671a63f-084c-4434-9465-fde65b8845ee","twoFactor":{},"usernameStatus":"ACTIVE","verified":true}}}
    
    {"event":{"createInstant":1667834917916,"eventLog":{"id":34,"insertInstant":1667834917913,"message":"Async Email Send exception occurred.\n\nTemplate Id: 3e6462be-178c-499f-92c9-3643ccca8ced\nTemplate Name: [FusionAuth Default] Setup Password\nTenant Id: 6825d48e-4df4-f83e-1055-f1d42e363749\nAddressed to: newuser@example.com\n\nCause:\ncom.sun.mail.util.MailConnectException : Message: Couldn't connect to host, port: localhost, 25; timeout -1","type":"Error"},"id":"44ca31e5-967c-4b5c-8ff4-1ee51d73999a","type":"event-log.create"}}

    Troubleshooting FusionAuth’s Kafka Integration

    Kafka is a powerful but complicated piece of software and you’ll need Kafka expertise to run a production Kafka setup to consume all of your FusionAuth events. If you get stuck integrating Kafka, try the following.

    Troubleshooting the FusionAuth-Side of the Integration

    • Check that you included a topic when adding the integration.

    • Check that you saved the integration after configuring and testing it.

    • Check the FusionAuth logs for errors relating to Kafka.

    • If you’re using Docker Compose, check that you’ve correctly mapped the ports for both Kafka and Zookeeper and correctly configured a Docker network so that FusionAuth can connect to Kafka.

    Troubleshooting your Kafka Installation

    It can be useful to run quick commands directly against your Kafka cluster to create topics and log events. You can download a version of Kafka from the official Kafka website and extract it to your local machine. This will give you some utility scripts to directly interact with your Kafka cluster.

    If you’re using a Docker Compose setup locally, add an entry to your hosts file to map kafka to 127.0.0.1.

    
    127.0.0.1 kafka

    You can then create a topic (for example, fa-events) that you can use in FusionAuth by running the following.

    
    bin/kafka-topics.sh --create --topic fa-events --bootstrap-server kafka:9092

    You can set a consumer to watch for new events with the following command. If everything is correctly set up, you’ll see events streamed to this consumer and logged to your shell as you carry out actions in FusionAuth.

    
    bin/kafka-console-consumer.sh --topic fa-events --from-beginning --bootstrap-server kafka:9092

    Or if you need to talk to a remote Kafka cluster, you can create a file locally called consumer.properties with credentials for your remote cluster.

    
    bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
    client.dns.lookup=use_all_dns_ips
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='ZN6LJ5UHSXWLW2LR'   password='M9T8b75OPspFAS37Do5Baq7jIS+hi7h7bY8MRrfVff5lz8xeCweaRTO8GD3nKXUD';
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    session.timeout.ms=45000

    And then run the scripts above passing in the file with the --consumer.config flag and the remote bootstrap server. For example, use the following to see events posted to your fa-events topic in a remote cluster.

    
    bin/kafka-console-consumer.sh --bootstrap-server pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 --consumer.config consumer.properties --topic fa-events

    Feedback

    How helpful was this page?

    See a problem?

    File an issue in our docs repo

    Have a question or comment to share?

    Visit the FusionAuth community forum.

    © 2023 FusionAuth
    Subscribe for developer updates