Apache Kafka初体验
背景
由于之前一直在阿里,用了集团的TT和MetaQ,没有使用Kafka。一直久仰Kafka的大名,今天有机会来体验一下。
初体验
原理
如上图所示,
- 1个topic
- 4个partitions:P1-P4。
- 2个producer,彼此是独立的,都在publishing。
- 同样的key,会写在同样的partition中。
- 两个producer,可能写入同一个partition中。
下载
下载二进制包,版本3.0.0:
https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
备选下载地址:
wget "https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz" --no-check-certificate --user-agent="Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.204 Safari/534.16"
备选下载地址:
wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
部署
$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0
启动 zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
启动 kafka broker
$ bin/kafka-server-start.sh config/server.properties
创建topic
创建一个topic,其中partitions为4,副本数.replication.factor为1。
$bin/kafka-topics.sh --create --topic quickstart-events --replication-factor 1 --partitions 4 --bootstrap-server localhost:9092
结果
Created topic quickstart-events.
查看topic
$ bin/kafka-topics.sh --list --zookeeper node1:2181
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
结果
Topic: quickstart-events TopicId: 1_6zeEyHRCmOXSxItl88JA PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 3 Leader: 0 Replicas: 0 Isr: 0
写入事件
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
按 Ctrl-C终止写入。
消费事件
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
按Ctrl-C来结束消费
优雅关闭Kafka
Ctrl-C来停止producer和consumer客户端。
Ctrl-C来停止Kafka broker。
Ctrl-C来停止ZooKeeper服务端。
APIs
Admin API
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
Producer API
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
Consumer API
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
Kafka Streams API
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.0.0</version>
</dependency>
或者
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_2.13</artifactId>
<version>3.0.0</version>
</dependency>
Kafka Connect API
略
用户名密码认证初体验
consumer配置
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/admin/kafka_2.13-3.0.0/jaas.conf"
bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server ip:9095 --consumer.config config/consumer.properties
其中
### jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="username"
password="password"
};
### consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
Java读写示例
依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
Java写入
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaWriterDemo {
public static void main(String[] args) {
Future<RecordMetadata> future = null;
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.16.152:9092,192.168.16.153:9092,192.168.154:9092"); //服务器地址端口
/*ack 配置项用来控制producer要求leader确认多少消息后返回调用成功。当值为0时producer不需要等待任何确认消息。当值为1时只需要等待leader确认。当值为-1或all时需要全部ISR集合返回确认才可以返回成功。*/
props.put("acks", "all");
/*当 retries > 0 时,如果发送失败,会自动尝试重新发送数据。发送次数为retries设置的值。*/
props.put("retries", 0);
/*buffer.memory、batch.size、linger.ms三个参数用来控制缓冲区大小和延迟发送时间,具体含义可以参考官方文档的配置。*/
props.put("batch.size", 16384);
/*key.serializer 和 value.serializer 指定使用什么序列化方式将用户提供的key和value进行序列化。运行此程序,在$KAFKA_HOME目录下运行:*/
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try {
future = producer.send(new ProducerRecord<String, String>("topic_name", "hello 1"));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.flush();
}
}
System.out.println(future.toString());
}
}
Java读取
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
//节点ip地址在申请集群后可以再集群详情页看到
props.put("bootstrap.servers", "10.1.1.1:9095");
props.setProperty("group.id", "dev");
props.setProperty("client.id", "d1ff077b6ac21c7757ab9bed39a71245");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//如果开启鉴权,需要添加以下配置,账号密码在权限管理处配置。
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required\n" +
"username=\"admin\"\n" +
"password=\"123456\";");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("st_abeffect_note_ri"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}