Kafka学习(一)--- Quickstart

分类 : 数据库

参考官网:http://kafka.apache.org/quickstart

一、下载Kafka

官网下载地址 http://kafka.apache.org/downloads

截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本

  • Scala 2.12  - kafka_2.12-2.3.0.tgz (asc, sha512)
    解压
    > tar -xzf kafka_2.12-2.3.0.tgz
    > cd kafka_2.12-2.3.0

二、启动服务

要先启动zookeeper kafka内置了一个 也可以不用

> bin/zookeeper-server-start.sh config/zookeeper.properties[2013-04-2215:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...> bin/kafka-server-start.sh config/server.properties[2013-04-2215:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-2215:01:47,051] INFO Property socket.send.buffer.bytes is overridden to1048576 (kafka.utils.VerifiableProperties)...

三、创建topic

replication-factor为1   partitions为1> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test查看topic> bin/kafka-topics.sh --list --bootstrap-server localhost:9092test

也可以不创建topic 设置自动创建 当publish的时候

四、发送消息

用command line client 进行测试 一行就是一条消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testThis is a messageThis is another message

五、消费者

command line consumer 可以接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningThis is a messageThis is another message

六、设置多broker集群

单broker没有意思 我们可以设置三个broker

首先为每个broker 复制配置文件

> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties

然后编辑

config/server-1.properties:    broker.id=1    listeners=PLAINTEXT://:9093    log.dirs=/tmp/kafka-logs-1 config/server-2.properties:    broker.id=2    listeners=PLAINTEXT://:9094    log.dirs=/tmp/kafka-logs-2

broker.id是唯一的 cluster中每一个node的名字 我们在same machine上 所有要设置listeners和log.dirs 以防冲突

建一个topic 一个partitions 三个replication-factor

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic用describe看看都是什么情况> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1ReplicationFactor:3Configs:    Topic: my-replicated-topic  Partition:0Leader:1Replicas:1,2,0Isr:1,2,0
  • 有几个概念 :

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.

  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

    刚才那个topic

    bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
    Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

发送 接收

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic...my test message 1my test message 2^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic...my test message 1my test message 2^C

试一下容错 fault-tolerance

> ps aux | grep server-1.properties7564 ttys002    0:15.91/System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...> kill -97564看一下变化:Leader换了一个  因为1被干掉了> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1ReplicationFactor:3Configs:    Topic: my-replicated-topic  Partition:0Leader:2Replicas:1,2,0Isr:2,0还是收到了消息> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic...my test message 1my test message 2^C

七、使用kafka import/export data

刚才都是console 的数据,其他的sources other systems呢 用Kafka Connect

弄一个数据> echo -e "foo\nbar" > test.txt启动  指定配置文件> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties验证一下> more test.sink.txtfoobar消费者端> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}{"schema":{"type":"string","optional":false},"payload":"bar"}...可以继续写入> echo Another line>> test.txt

八、使用Kafka Streams

http://kafka.apache.org/22/documentation/streams/quickstart

WordCountDemo

https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

代码片段

// Serializers/deserializers (serde) for String and Long typesfinal Serde<String> stringSerde = Serdes.String();final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values// represent lines of text (for the sake of this example, we ignore whatever may be stored// in the message keys).KStream<String, String> textLines = builder.stream("streams-plaintext-input",    Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines    // Split each text line, by whitespace, into words.    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))     // Group the text words as message keys    .groupBy((key, value) -> value)     // Count the occurrences of each word (message key).    .count() // Store the running counts as a changelog stream to the output topic.wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

建一个 Kafka producer 指定input topic output topic

> bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 1 \--topic streams-wordcount-output \--config cleanup.policy=compactCreated topic "streams-wordcount-output".

启动WordCount demo application

bin/kafka-run-class.shorg.apache.kafka.streams.examples.wordcount.WordCountDemo

启动一个生产者写数据

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-inputall streams lead to kafkahello kafka streams

启动一个消费者接数据

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \    --topic streams-wordcount-output \    --from-beginning \    --formatter kafka.tools.DefaultMessageFormatter \    --property print.key=true \    --property print.value=true \    --propertykey.deserializer=org.apache.kafka.common.serialization.StringDeserializer \    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer    all     1streams 1lead    1to1kafka   1hello   1kafka   2streams 2kafka   1
在这个世界上,最重要的不是你所处的位置,而是你前进的方向!


分类: 数据库  发布: 2019-07-09-星期二   访问() .NET技术    Web前端    JAVA开发    HTML基础    数据库    CSS基础    电脑知识   
CSS教程-web前端教程-免费教程
HTML教程-web前端教程-免费教程

分类

.NET技术 Web前端 JAVA开发 HTML基础 数据库 CSS基础 电脑知识

随机阅读

02-标识符,关键字和保留字
【MySQL】漫谈MySQL体系结构
javascript之标识(zhi)符、关键字与保留字
《MySQL 性能优化》之理解 MySQL 体系结构
MYSQL性能优化之Mysql体系结构,存储引擎
Python语法的使用和简介
底层剖析 Window 、Activity、 View 三者关系
Unity音量可视化——粒子随声浪跳动
JavaScript中this指向问题
css talbe中td溢出隐藏 div溢出隐藏

最新

div css隐藏内容样式方法
OPPO Reno3 Pro远程守护怎么使用?
XP系统里让IE支持多线程下载怎么设置
qq电脑管家温度检测在哪设置的? QQ电脑管家怎么测电脑的温度
win7禁用ie浏览器方法 win7怎么把桌面的ie隐藏
WinXP笔记本声卡驱动无法成功安装的解决方法
XP⁄Win7共享⁄连接打印机设置详细 xp怎
华为笔记本蓝屏错误0xcoooo428怎么解决
企业如何部署微软Windows 8? Windows 8的开发历史
华为mate9怎样解指纹密码?

推荐阅读

c语言中break语句的作用
undefined是什么意思啊
vscode----vue中HTML代码tab键自动补全
前端该怎么学?推荐一个学习路线!
.Net轻松处理亿级数据--ClickHouse数据操作
js 中日期 转换成时间戳 例如2013-08-30 转换为时间戳
html5中datalist标签怎么用
DIV CSS字体(font-family)实现字体样式设置
Mysql里表示布尔型的类型是什么
Datalist options 集合,options语法,optio

Copyright © 2017 CSS5.NET教程.CSS5 内容仅用于学习和测试参考。 css5.net All Rights Reserved 蜀ICP备15003849号-16