Apache Kafka
Connect to Apache Kafka#
Connecting to Apache Kafka in Apache APISIX is very simple.
Currently, we provide a simpler way to integrate by combining two APIs, ListOffsets and Fetch, to quickly implement the ability to pull Kafka messages. Still, they do not support Apache Kafka's consumer group feature for now and cannot be managed for offsets by Apache Kafka.
Limitations#
- Offsets need to be managed manually
They can be stored by a custom backend service or obtained via the list_offset command before starting to fetch the message, which can use timestamp to get the starting offset, or to get the initial and end offsets.
- Unsupported batch data acquisition
A single instruction can only obtain the data of a Topic Partition, does not support batch data acquisition through a single instruction
Prepare#
First, it is necessary to compile the communication protocol as a language-specific SDK using the protoc, which provides the command and response definitions to connect to Kafka via APISIX using the WebSocket.
The sequence field in the protocol is used to associate the request with the response, they will correspond one to one, the client can manage it in the way they want, APISIX will not modify it, only pass it back to the client through the response body.
The following commands are currently used by Apache Kafka connect:
- CmdKafkaFetch
- CmdKafkaListOffset
The
timestampfield in theCmdKafkaListOffsetcommand supports the following value:
unix timestamp: Offset of the first message after the specified timestamp
-1:Offset of the last message of the current Partition
-2:Offset of the first message of current PartitionFor more information, see Apache Kafka Protocol Documentation
Possible response body: When an error occurs, ErrorResp will be returned, which includes the error string; the rest of the response will be returned after the execution of the particular command.
- ErrorResp
- KafkaFetchResp
- KafkaListOffsetResp
How to use#
Create route#
Create a route, set the upstream scheme field to kafka, and configure nodes to be the address of the Kafka broker.
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
    -H 'X-API-KEY: <api-key>' \
    -H 'Content-Type: application/json' \
    -d '{
    "uri": "/kafka",
    "upstream": {
        "nodes": {
            "kafka-server1:9092": 1,
            "kafka-server2:9092": 1,
            "kafka-server3:9092": 1
        },
        "type": "none",
        "scheme": "kafka"
    }
}'
After configuring the route, you can use this feature.
Enabling TLS and SASL/PLAIN authentication#
Simply turn on the kafka-proxy plugin on the created route and enable the Kafka TLS handshake and SASL authentication through the configuration, which can be found in the plugin documentation.
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
    -H 'X-API-KEY: <api-key>' \
    -H 'Content-Type: application/json' \
    -d '{
    "uri": "/kafka",
    "plugins": {
        "kafka-proxy": {
            "sasl": {
                "username": "user",
                "password": "pwd"
            }
        }
    },
    "upstream": {
        "nodes": {
            "kafka-server1:9092": 1,
            "kafka-server2:9092": 1,
            "kafka-server3:9092": 1
        },
        "type": "none",
        "scheme": "kafka",
        "tls": {
            "verify": true
        }
    }
}'