Apache Kafka初体验

  |   0 评论   |   0 浏览

背景

由于之前一直在阿里,用了集团的TT和MetaQ,没有使用Kafka。一直久仰Kafka的大名,今天有机会来体验一下。

初体验

原理

如上图所示,

  • 1个topic
  • 4个partitions:P1-P4。
  • 2个producer,彼此是独立的,都在publishing。
  • 同样的key,会写在同样的partition中。
  • 两个producer,可能写入同一个partition中。

下载

下载二进制包,版本3.0.0:

https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz

备选下载地址:

wget "https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz" --no-check-certificate --user-agent="Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.204 Safari/534.16"

备选下载地址:

wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz

部署

$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0

启动 zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 kafka broker

$ bin/kafka-server-start.sh config/server.properties

创建topic

创建一个topic,其中partitions为4,副本数.replication.factor为1。

$bin/kafka-topics.sh --create --topic quickstart-events --replication-factor 1 --partitions 4 --bootstrap-server localhost:9092

结果

Created topic quickstart-events.

查看topic

$ bin/kafka-topics.sh --list --zookeeper node1:2181
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

结果

Topic: quickstart-events        TopicId: 1_6zeEyHRCmOXSxItl88JA PartitionCount: 4       ReplicationFactor: 1    Configs: segment.bytes=1073741824
Topic: quickstart-events        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: quickstart-events        Partition: 1    Leader: 0       Replicas: 0     Isr: 0
Topic: quickstart-events        Partition: 2    Leader: 0       Replicas: 0     Isr: 0
Topic: quickstart-events        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

写入事件

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

按 Ctrl-C终止写入。

消费事件

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

按Ctrl-C来结束消费

优雅关闭Kafka

Ctrl-C来停止producer和consumer客户端。

Ctrl-C来停止Kafka broker。

Ctrl-C来停止ZooKeeper服务端。

APIs

Admin API

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.0.0</version>
</dependency>

Producer API

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.0.0</version>
</dependency>

Consumer API

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.0.0</version>
</dependency>

Kafka Streams API

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams</artifactId>
	<version>3.0.0</version>
</dependency>

或者

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams-scala_2.13</artifactId>
	<version>3.0.0</version>
</dependency>

Kafka Connect API

用户名密码认证初体验

consumer配置

export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/admin/kafka_2.13-3.0.0/jaas.conf"

bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server ip:9095 --consumer.config config/consumer.properties

其中

### jaas.conf

KafkaServer { 
org.apache.kafka.common.security.plain.PlainLoginModule required 
username="username"
password="password"
};

### consumer.properties

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";

Java读写示例

依赖

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
    </dependencies>

Java写入

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaWriterDemo {
    public static void main(String[] args) {
        Future<RecordMetadata> future = null;
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.16.152:9092,192.168.16.153:9092,192.168.154:9092");  //服务器地址端口
        /*ack 配置项用来控制producer要求leader确认多少消息后返回调用成功。当值为0时producer不需要等待任何确认消息。当值为1时只需要等待leader确认。当值为-1或all时需要全部ISR集合返回确认才可以返回成功。*/
        props.put("acks", "all");
        /*当 retries > 0 时,如果发送失败,会自动尝试重新发送数据。发送次数为retries设置的值。*/
        props.put("retries", 0);
        /*buffer.memory、batch.size、linger.ms三个参数用来控制缓冲区大小和延迟发送时间,具体含义可以参考官方文档的配置。*/
        props.put("batch.size", 16384);
        /*key.serializer 和 value.serializer 指定使用什么序列化方式将用户提供的key和value进行序列化。运行此程序,在$KAFKA_HOME目录下运行:*/
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        try {
            future = producer.send(new ProducerRecord<String, String>("topic_name", "hello 1"));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (producer != null) {
                producer.flush();
            }
        }
        System.out.println(future.toString());
    }
}

Java读取

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        //节点ip地址在申请集群后可以再集群详情页看到
        props.put("bootstrap.servers", "10.1.1.1:9095");
        props.setProperty("group.id", "dev");
        props.setProperty("client.id", "d1ff077b6ac21c7757ab9bed39a71245");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //如果开启鉴权,需要添加以下配置,账号密码在权限管理处配置。
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required\n" +
                "username=\"admin\"\n" +
                "password=\"123456\";");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("st_abeffect_note_ri"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

参考

  1. Kafka官方文档
  2. java读取、写入kafka数据流