Connect to Open Catalog from Apache Kafka Connect
You can use the Apache Iceberg Sink Connector for Kafka Connect to write data from Kafka topics to Iceberg tables managed by Open Catalog. This page describes how to set up Kafka Connect to authenticate with Dremio using the Dremio Auth Manager for Apache Iceberg.
Prerequisites
- Java 17 or later for the Kafka Connect runtime.
- Docker and Docker Compose installed and running.
- The Iceberg Sink Connector for Kafka Connect. Check available versions on Confluent Hub.
- The
authmgr-oauth2-standaloneJAR from Dremio Auth Manager for Apache Iceberg releases.
You must use the authmgr-oauth2-standalone JAR variant (not authmgr-oauth2-runtime) because this variant does not relocate Iceberg packages, which is required for compatibility with the Iceberg Kafka Sink Connector.
Setting Up Kafka and Kafka Connect
Kafka Connect runs as a separate process that connects to a Kafka broker. The following steps use Docker Compose to run both a Kafka broker and a Kafka Connect worker.
Step 1: Create the Project Directory
Create the project directorymkdir -p kafka-iceberg
cd kafka-iceberg
Step 2: Create the Dockerfile for Kafka Connect
Create a Dockerfile that extends the Confluent Kafka Connect image with the Iceberg Sink Connector and the Dremio Auth Manager JAR:
FROM confluentinc/cp-kafka-connect:8.0.0
# Install the Iceberg Sink Connector
RUN confluent-hub install --no-prompt iceberg/iceberg-kafka-connect:1.9.2
# Download the Dremio Auth Manager standalone JAR
ADD --chown=appuser:appuser --chmod=644 \
https://github.com/dremio/iceberg-auth-manager/releases/download/authmgr-1.0.0/authmgr-oauth2-standalone-1.0.0.jar \
/usr/share/confluent-hub-components/iceberg-iceberg-kafka-connect/lib/authmgr-oauth2-standalone-1.0.0.jar
Step 3: Create the Docker Compose File
Create a docker-compose.yml file that runs both a Kafka broker and the Kafka Connect worker:
services:
kafka:
image: confluentinc/cp-kafka:8.0.0
ports:
- "9092:9092"
environment:
CLUSTER_ID: "kafka-iceberg-cluster"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_NODE_ID: "1"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9093,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9093"]
interval: 10s
timeout: 5s
retries: 5
connect:
build: .
ports:
- "8083:8083"
depends_on:
kafka:
condition: service_healthy
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9093"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_PORT: "8083"
CONNECT_GROUP_ID: "iceberg-connect"
CONNECT_CONFIG_STORAGE_TOPIC: "iceberg-connect-config"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: "iceberg-connect-offsets"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: "iceberg-connect-status"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:8083/ || exit 1"]
interval: 10s
timeout: 5s
retries: 10
start_period: 60s
Step 4: Build and Start the Cluster
Build and start the Kafka clusterdocker compose build
docker compose up -d
Wait for Kafka Connect to be ready by checking its REST API:
Verify Kafka Connect is readycurl -s http://localhost:8083/ | jq .
Deploying a Connector
Connectors are deployed by submitting a JSON configuration file to the Kafka Connect REST API. The sections below provide example configurations for PAT and identity provider (IdP) authentication. To deploy a connector:
- Save the desired configuration to a file, for example
connector-config.json. - Replace the placeholder values in the file with your actual values.
- Submit the configuration to the Kafka Connect REST API:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
- Verify the connector status:
curl -s http://localhost:8083/connectors/iceberg-sink/status | jq .
Authenticating with Dremio Using Dremio PAT
Below is an example connector configuration that writes data from a Kafka topic to an Iceberg table in Open Catalog, using a personal access token (PAT) for authentication:
Kafka Connector configuration with PAT authentication{
"name": "iceberg-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "<your_topic>",
"iceberg.tables": "<namespace>.<table>",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://$DREMIO_ADDRESS:8181/api/catalog",
"iceberg.catalog.warehouse": "default",
"iceberg.catalog.header.X-Iceberg-Access-Delegation": "vended-credentials",
"iceberg.catalog.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager",
"iceberg.catalog.rest.auth.oauth2.token-endpoint": "http://<dremio_address>:9047/oauth/token",
"iceberg.catalog.rest.auth.oauth2.grant-type": "urn:ietf:params:oauth:grant-type:token-exchange",
"iceberg.catalog.rest.auth.oauth2.client-id": "dremio",
"iceberg.catalog.rest.auth.oauth2.client-auth": "none",
"iceberg.catalog.rest.auth.oauth2.scope": "dremio.all",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token": "<personal_access_token>",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token-type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token"
}
}
Replace:
<your_topic>with the Kafka topic to consume.<namespace>.<table>with the target Iceberg table in Open Catalog.<dremio_address>with the hostname of your Dremio instance.<personal_access_token>with your Dremio PAT.
Authenticating with Dremio Using OAuth2 (External Identity Provider)
Below is an example connector configuration that uses an external IdP for user authentication. The Dremio Auth Manager obtains a JSON Web Token (JWT) from the IdP using the client_credentials grant type, then exchanges it for a Dremio OAuth access token.
{
"name": "iceberg-sink",
"config": {
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "<your_topic>",
"iceberg.tables": "<namespace>.<table>",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://$DREMIO_ADDRESS:8181/api/catalog",
"iceberg.catalog.warehouse": "default",
"iceberg.catalog.header.X-Iceberg-Access-Delegation": "vended-credentials",
"iceberg.catalog.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager",
"iceberg.catalog.rest.auth.oauth2.issuer-url": "http://<dremio_address>:9047/oauth/token",
"iceberg.catalog.rest.auth.oauth2.grant-type": "urn:ietf:params:oauth:grant-type:token-exchange",
"iceberg.catalog.rest.auth.oauth2.scope": "dremio.all",
"iceberg.catalog.rest.auth.oauth2.client-id": "dremio",
"iceberg.catalog.rest.auth.oauth2.client-auth": "none",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token.issuer-url": "http://<idp_address>:8080/realms/iceberg",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token.grant-type": "client_credentials",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token.scope": "<scope>",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token.client-id": "<client_id>",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token.client-secret": "<client_secret>",
"iceberg.catalog.rest.auth.oauth2.token-exchange.subject-token-type": "urn:ietf:params:oauth:token-type:jwt"
}
}
Replace:
<your_topic>with the Kafka topic to consume.<namespace>.<table>with the target Iceberg table in Open Catalog.<dremio_address>with the hostname of your Dremio instance.<idp_address>with the hostname of your IdP (Keycloak in this example).<client_id>and<client_secret>with the credentials issued by your IdP.<scope>with the scope to request from the IdP.
- Since Kafka Connect runs as a background service, it's not possible to use OAuth2 grant types that require interactive user logins.
- The main OAuth2 settings (
issuer-url,grant-type,scope,client-id) point to the Dremio token endpoint and configure the token exchange flow. - The
token-exchange.subject-token.*settings configure how the Dremio Auth Manager obtains the subject token from the external IdP.