Kafka是一种高吞吐、高性能的消息队列。参考官网 的Quick Start
可以很快在本地把环境搭起来。
Quick Start 1.官网下载并解压 1 2 $ tar -xzf kafka_2.11-0.10.0.0.tgz $ cd kafka_2.11-0.10.00
2. 启动server 先启动zookeeper做服务治理,然后启动kafka server,也就是broker
1 2 3 4 5 6 7 8 9 10 $ bin/zookeeper-server-start.sh config/zookeeper.properties [2016-06-30 10:35:05,734] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... $ bin/kafka-server-start.sh config/server.properties [2016-06-30 10:35:20,354] INFO KafkaConfig values: advertised.host.name = null ...
3. 新建topic 1 2 3 4 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test $ bin/kafka-topics.sh --list --zookeeper localhost:2181 test
4. Producer发送Message 1 2 $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test this is a message
到这个地方我(Mac)出错了,错误信息是WARN Error while fetching metadata
等。查了一会才发现是hostname的问题。退到启动zookeeper的地方找到host.name那一行如下所示。
1 [2016-06-30 10:35:05,789] INFO Server environment:host.name=bogon (org.apache.zookeeper.server.ZooKeeperServer)
这是怎么回事呢?因为如果我们不在配置文件中配置host.name,默认是使用java的API来得到hostname的,参见config/server.properties文件。
知道了问题解决起来就很简单了,要么在配置文件中指定,要么更改hostname,更改hostname的命令如下。改完把上面的步骤重新来过。如果还有问题,那么就把之前记录的log删除。log目录在/tmp/kafka-logs
和/tmp/zookeeper
。
1 sudo scutil --set HostName localhost
5. 启动consumer消费信息 另外起一个Terminal,键入下面命令。然后在producer的terminal里键入“This is a message”,consumer的终端就可以显示出来了。
1 2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message
Kafka Go API 如果使用golang来开发Kafka可以使用sarama库。下面就写一个简单的生产者,消费者 Demo。生产者代码如下。可以不断输入字符串并作为message发送到topic中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "fmt" "github.com/Shopify/sarama" ) func main () { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner producer, err := sarama.NewSyncProducer([]string {"localhost:9092" }, config) if err != nil { panic (err) } defer producer.Close() msg := &sarama.ProducerMessage { Topic: "kltao" , Partition: int32 (-1 ), Key: sarama.StringEncoder("key" ), } var value string for { _, err := fmt.Scanf("%s" , &value) if err != nil { break } msg.Value = sarama.ByteEncoder(value) fmt.Println(value) partition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println("Send message Fail" ) } fmt.Printf("Partition = %d, offset=%d\n" , partition, offset) } }
消费者代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package mainimport ( "fmt" "sync" "github.com/Shopify/sarama" ) var ( wg sync.WaitGroup ) func main () { consumer, err := sarama.NewConsumer([]string {"localhost:9092" }, nil ) if err != nil { panic (err) } partitionList, err := consumer.Partitions("kltao" ) if err != nil { panic (err) } for partition := range partitionList { pc, err := consumer.ConsumePartition("kltao" , int32 (partition), sarama.OffsetNewest) if err != nil { panic (err) } defer pc.AsyncClose() wg.Add(1 ) go func (sarama.PartitionConsumer) { defer wg.Done() for msg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n" , msg.Partition, msg.Offset, string (msg.Key), string (msg.Value)) } }(pc) } wg.Wait() consumer.Close() }
我们同时启动生产者和消费者,然后在生产者的terminal输入一些字符串就可以在消费者的terminal看到输出了,如下图所示。