AMQ Streams on RHEL - Example Kafka client with Kerberos authentication

Updated

This article describes the configuration of an example client to authenticate with a Kerberos-enabled broker cluster.

Example client using Kerberos authentication

The client is a Spring Boot Camel application, using the fabric8-project-bom-camel-spring-boot BOM for dependency management.

Apart from the general dependency requirements for a Camel Spring Boot application,
the only explicit dependency requirement is the camel-kafka package from org.apache.camel that pulls in additional artifacts as transitive dependencies.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>com.redhat.example</groupId>
<artifactId>camel-springboot-kafka-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Fabric8 :: Quickstarts :: Spring-Boot :: Camel Kafka :: Consumer</name>
<description>Spring Boot example running a Camel Kafka Consumer</description>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <fabric8.maven.plugin.version>3.0.11.fuse-000065-redhat-3</fabric8.maven.plugin.version>
    <fabric8.version>3.0.11.fuse-000065-redhat-3</fabric8.version>
    <spring-boot.version>1.4.1.RELEASE</spring-boot.version>
    <maven-compiler-plugin.version>3.3</maven-compiler-plugin.version>
    <maven-surefire-plugin.version>2.18.1</maven-surefire-plugin.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.fabric8</groupId>
            <artifactId>fabric8-project-bom-camel-spring-boot</artifactId>
            <version>${fabric8.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>javax.inject</groupId>
        <artifactId>javax.inject</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <!-- testing -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.jboss.arquillian.junit</groupId>
        <artifactId>arquillian-junit-container</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.fabric8</groupId>
        <artifactId>fabric8-arquillian</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <defaultGoal>spring-boot:run</defaultGoal>

    <plugins>

        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven-compiler-plugin.version}</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>${maven-surefire-plugin.version}</version>
            <inherited>true</inherited>
            <configuration>
                <excludes>
                    <exclude>**/*KT.java</exclude>
                </excludes>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>${spring-boot.version}</version>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

    </plugins>

</build>
</project>

Example consumer application

The consumer is a Spring configuration for consuming messages from a Kafka topic and logging them.

The Spring Camel context XML configuration for the consumer application is saved in the project at src/main/resources/spring/camel-context.xml.

Content from www.springframework.org is not included.http://www.springframework.org/schema/beans%20http://www.springframework.org/schema/beans/spring-beans.xsd

Relevant properties referenced in the Camel context are set in a Spring src/main/resources/application.properties file.

logging.config=classpath:logback.xml
camel.springboot.name=CamelKafkaConsumer <1>
server.address=0.0.0.0 <2>
management.address=0.0.0.0
management.port=8081 <3>
endpoints.enabled = false <4>
endpoints.health.enabled = true
kafka.bootstrap.uri=node1.example.redhat.com:9092,node2.example.redhat.com:9092,node3.example.redhat.com:9092
kafka.serializerClass=kafka.serializer.StringEncoder
consumer.topic=Topic1 <5>
consumer.group=consumer_group_1
consumer.max.poll.records=1
consumer.threads=10
consumer.consumersCount=1 <6>
consumer.auto.offset.reset=earliest <7>
consumer.auto.commit.enable=true
consumer.receive.buffer.bytes=-1 <8>
security.protocol = SASL_SSL <9>
ssl.truststore.location = /opt/fuse/keystore/server.chain.pkcs12
ssl.truststore.password = changepass
ssl.truststore.type = PKCS12
sasl.mechanism = GSSAPI <10>
sasl.kerberos.service.name=kafka <11>
sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required debug="true" useKeyTab="true" storeKey="true" keyTab="/home/example/krb5/kafka-client.keytab" principal="kafka-client@EXAMPLE.REDHAT.COM"; <12>

<1> Configuration options from org.apache.camel.spring.boot.CamelConfigurationProperties.
<2> Listen on all ports.
<3> Specify a management port in case you need to listen to HTTP requests on port 8080.
<4> Disable all management endpoints except health.
<5> Consumer properties, where one consumer can listen to more than one topic [Topic1,Topic2].
<6> Number of consumers that connect to the Kafka server.
<7> Reset behavior if offset information is missing
<8> -1 specifies the OS buffer as the consumer receive buffer (default byte size is 65536).
<9> Specifies SASL authentication for the broker, in addition to the transport-level SSL negotiation.
<10> The login module uses Kerberos GSSAPI authentication.
<11> The service to access by the client, correlated with the sasl.kerberos.service.name attribute configured for the broker when setting up Kerberos authentication.
<12> The JAAS configuration passed as a URI parameter.

  • com.sun.security.auth.module.Krb5LoginModule required informs the client that the login module used for negotiation is the Krb5LoginModule used for SASL/GSSAPI negotiation.
  • keytab points to the location of the kafka-client.keytab copied from the krb5 KDC when setting up the security principals during configuration of the Kerberos domain.
  • principal is the user name used for authentication using the keytab.

Example producer application

The producer configuration is similar to the consumer, sharing the same Kerberos-related configuration properties in its to URI
as the from URI in the consumer configuration.

The Spring Camel context XML configuration for the producer application is saved in the project at src/main/resources/spring/camel-context.xml.

Content from www.springframework.org is not included.http://www.springframework.org/schema/beans%20http://www.springframework.org/schema/beans/spring-beans.xsd>

The producer application adds a configurable timer route to send messages to the Kafka cluster at a specified rate/interval.

Configuration properties are similar to those of the consumer application.
Relevant properties referenced in the Camel context are set in the Spring src/main/resources/application.properties file.

logging.config=classpath:logback.xml
camel.springboot.name=CamelKafkaProducer
server.address=0.0.0.0
management.address=0.0.0.0
management.port=8082 <1>
server.port=8090
endpoints.enabled = false
endpoints.health.enabled = true
kafka.bootstrap.uri=node1.example.redhat.com:9092,node2.example.redhat.com:9092,node3.example.redhat.com:9092
kafka.serializerClass=kafka.serializer.StringEncoder
send.interval=1
producer.topic=TestLog
producer.partitioner=com.redhat.example.kafka.StringPartitioner
producer.require.acks=all
security.protocol = SASL_SSL
ssl.truststore.location = /opt/fuse/keystore/server.chain.pkcs12
ssl.truststore.password = changepass
ssl.truststore.type = PKCS12
sasl.mechanism = GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required debug="true" useKeyTab="true" storeKey="true" keyTab="/home/example/krb5/kafka-client.keytab" principal="kafka-client@EXAMPLE.REDHAT.COM";

<1> The server and management ports for the producer application are offset +1 from the consumer application to avoid conflicts when running from the same host.

Kerberos configuration for the client

In addition to the URI parameters, if different from the default Kerberos realm information,
the Kerberos configuration for the consumer and producer must include the realm from the krb5.conf file used for Kerberos authentication.

The krb5 configuration is specified for the client as a JVM parameter.
The default for Red Hat Linux systems is /etc/krb5.conf.

To use an alternate configuration, use the JVM parameter at startup with the correct path to the krb5.conf file:

Djava.security.krb5.conf=/home/example/krb5/krb5.conf

Message payload for the producer

To complete the producer configuration, a utility class is used to generate a message payload.

This Camel processor class generates random strings of a specified length:

package com.redhat.example.kafka;

import java.util.Random;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class RandomStringGenerator implements Processor {

@Override
public void process(Exchange exchng) throws Exception {
    StringBuilder builder = new StringBuilder();
    Random random = new Random();
    for (int j = 0; j < 10; j++) {
        builder.append((char) (random.nextInt(94) + 32));
    }
    exchng.getOut().setBody(builder.toString());
  }
}

Application class

An application class bootstraps the Spring Camel context for the consumer or producer application.

package com.redhat.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication <1>
// load regular Spring XML file from the classpath that contains the Camel XML DSL
@ImportResource({"classpath:spring/camel-context.xml"}) <2>
public class Application {
          // A main method to start this application
         public static void main(String[] args) {
         SpringApplication.run(Application.class, args);
         }  
      }

<1> The @SpringBootApplication annotation informs the JVM that the application is a SpringBoot executable.
<2> The @ImportResource annotation informs the Spring container that there is a configuration resource at the specified location.
In this case, the resource is an XML configuration file containing a Camel context and route definition.

Running the consumer application

When building and running the consumer application, an initial batch of warning messages is generated as partitions and topics are not yet created within the broker cluster.

Once the partitions and topics have been created, the successful login messages are displayed in the logs:

[Camel (CamelKafkaConsumer) thread #1 - KafkaConsumer[TestLog]] INFO  o.a.k.c.s.a.AbstractLogin - Successfully logged in.
[kafka-kerberos-refresh-thread-kafka-client@EXAMPLE.REDHAT.COM] INFO  o.a.k.c.s.kerberos.KerberosLogin - [Principal=kafka-client@EXAMPLE.REDHAT.COM]: TGT refresh thread started.
[kafka-kerberos-refresh-thread-kafka-client@EXAMPLE.REDHAT.COM] INFO  o.a.k.c.s.kerberos.KerberosLogin - [Principal=kafka-client@EXAMPLE.REDHAT.COM]: TGT valid starting at: _date_
[kafka-kerberos-refresh-thread-kafka-client@EXAMPLE.REDHAT.COM] INFO  o.a.k.c.s.kerberos.KerberosLogin - [Principal=kafka-client@EXAMPLE.REDHAT.COM]: TGT expires: _date_
[kafka-kerberos-refresh-thread-kafka-client@EXAMPLE.REDHAT.COM] INFO  o.a.k.c.s.kerberos.KerberosLogin - [Principal=kafka-client@EXAMPLE.REDHAT.COM]: TGT refresh sleeping until: _date_
# ...

Running the producer application

When building and running the producer application, message traffic is logged.

[Camel (CamelKafkaConsumer) thread #1 - KafkaConsumer[TestLog]] INFO  kafka-consumer-route - =0JH{qVbPJ
[Camel (CamelKafkaConsumer) thread #1 - KafkaConsumer[TestLog]] INFO  kafka-consumer-route - C."0h"fZII
[Camel (CamelKafkaConsumer) thread #1 - KafkaConsumer[TestLog]] INFO  kafka-consumer-route - $HKdvexx>U
# ...
Product(s)
Category
Components
Article Type