kafka实战

Kafka是一种高吞吐、高性能的消息队列。参考官网Quick Start可以很快在本地把环境搭起来。

Quick Start

1.官网下载并解压

1
2
$ tar -xzf kafka_2.11-0.10.0.0.tgz
$ cd kafka_2.11-0.10.00

2. 启动server

先启动zookeeper做服务治理,然后启动kafka server,也就是broker

1
2
3
4
5
6
7
8
9
10
# start zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2016-06-30 10:35:05,734] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

# start kafka server
$ bin/kafka-server-start.sh config/server.properties
[2016-06-30 10:35:20,354] INFO KafkaConfig values:
advertised.host.name = null
...

3. 新建topic

1
2
3
4
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

4. Producer发送Message

1
2
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
this is a message

到这个地方我(Mac)出错了,错误信息是WARN Error while fetching metadata等。查了一会才发现是hostname的问题。退到启动zookeeper的地方找到host.name那一行如下所示。

1
[2016-06-30 10:35:05,789] INFO Server environment:host.name=bogon (org.apache.zookeeper.server.ZooKeeperServer)

这是怎么回事呢?因为如果我们不在配置文件中配置host.name,默认是使用java的API来得到hostname的,参见config/server.properties文件。

1
2
3
4
5
6
7
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

知道了问题解决起来就很简单了,要么在配置文件中指定,要么更改hostname,更改hostname的命令如下。改完把上面的步骤重新来过。如果还有问题,那么就把之前记录的log删除。log目录在/tmp/kafka-logs/tmp/zookeeper

1
sudo scutil --set HostName localhost

5. 启动consumer消费信息

另外起一个Terminal,键入下面命令。然后在producer的terminal里键入“This is a message”,consumer的终端就可以显示出来了。

1
2
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message

Kafka Go API

如果使用golang来开发Kafka可以使用sarama库。下面就写一个简单的生产者,消费者 Demo。生产者代码如下。可以不断输入字符串并作为message发送到topic中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}

defer producer.Close()

msg := &sarama.ProducerMessage {
Topic: "kltao",
Partition: int32(-1),
Key: sarama.StringEncoder("key"),
}

var value string
for {
_, err := fmt.Scanf("%s", &value)
if err != nil {
break
}
msg.Value = sarama.ByteEncoder(value)
fmt.Println(value)

partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
}
}

消费者代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"fmt"
"sync"

"github.com/Shopify/sarama"
)

var (
wg sync.WaitGroup
)

func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}

partitionList, err := consumer.Partitions("kltao")
if err != nil {
panic(err)
}

for partition := range partitionList {
pc, err := consumer.ConsumePartition("kltao", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}

defer pc.AsyncClose()

wg.Add(1)

go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}

}(pc)
}
wg.Wait()
consumer.Close()
}

我们同时启动生产者和消费者,然后在生产者的terminal输入一些字符串就可以在消费者的terminal看到输出了,如下图所示。