1. KafkaProperties
package com.immooc.spark.kafka;
public class KafkaProperties {
public static final String ZK = "localhost:2181";
public static final String TOPIC = "test";
public static final String BROKER_LIST = "localhost:9092";
public static final String GROUP_ID = "test_group1";
}
2. KafkaProducer
package com.immooc.spark.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer extends Thread{
private String topic;
private Producer<Integer, String> producer;
public KafkaProducer(String topic){
this.topic = topic;
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
@Override
public void run(){
int messageNo = 1;
while (true){
String message = "message_" + messageNo;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("Send: " + message);
messageNo ++;
try{
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
3. KafkaConsumer
package com.immooc.spark.kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer extends Thread{
private String topic;
public KafkaConsumer(String topic){
this.topic = topic;
}
private ConsumerConnector createConnector(){
Properties properties = new Properties();
properties.put("zookeeper.connect", KafkaProperties.ZK);
properties.put("group.id", KafkaProperties.GROUP_ID);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run(){
ConsumerConnector consumer = createConnector();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("rec:" + message);
}
}
}
4202
原文
http://www.waitingfy.com/archives/4202