Spark Streaming and Kafka Integration

Spark Streaming and Kafka are two talked about technologies these days. Kafka is a distributed queue. we can post message to kafka queue and also read message from kafka queue. Spark Streaming can keep on listening to a source and process the incoming data.
Let us understand how can we integrate these two technologies. As part of this post, we will learn following

1. Read Message from Kafka queue
2. Write Message to Kafka Queue

Reading Message from Kafka queue : First of all we should create a stream which will keep listening to Kafka topic. Following is code for this.

import org.apache.spark.streaming.kafka._

@transient val kafkaStream = KafkaUtils.createStream(streamingContext,"zookpr-host:2181","kafka-grp", Map("Kafka_input" -> 1.toInt))

There is another way to read messages from Kafka topics, i.e. Kafka Direct. Kafka Direct is out of scope for this scope. We will discuss about that in another post.

        once kafkaStream is ready, we can start getting messages and  process them.  Once the message is processed we will like to post it to another kafka topic. For this we will  create a kafka sink


class KafkaSink( createProducer : () => new KafkaProducer[String,String]) extends Serializable { lazy val prdcr = createProducer()

def send(topic : String, msg : String ): Unit ={ prdcr.send(new ProducerRecord[Strig,String] (topic,msg) )}
}  

// Create object of KafkaSink
object KafkaSink{
def apply(config:Map[String,Object]):KafkaSink = {
    val f = () =>{ val prdcr = new KafkaProducer[String,String](config)
    sys.addShutdownHook{
         prdcr.close()
    }
    prdcr
    }
    new KafkaSink(f)
}}
        Once KafkaSink is created, we can broadcast it to all executors and use it there.

val kafkaSink = sc.bradcast(KafkaSink(config))
once kafkaSink is broadcasted it can be used at all executor. we can post the dstream message to the output kafka queue.

kafkaStream.foreachRDD{ rdd => rdd.foreachPartition{ 
            part => part.foreach(msg => kafkaSink.value.send("kafka_out_topic",msg.toSt                    ring)
            }}

Here if you notice, i have put @transient in front of kafkaStream variables. kafkaStream is of type dstream. DStream is not serializable. so we have to make it transient. 

You can access the code of this application at my github.

Please share your suggestions and comments

Comments

  1. Hi Harjeet,

    I am facing issue at collect and print actions in below code while integrating the spark with Kafka. I am able to build the KafkaRDD but failing to call the collect() action. Could you suggest what could solve the issue?

    val KafkaRDD = KafkaUtils.createRDD[String,String]
    (sc, javamap, offsetRanges, LocationStrategies.PreferConsistent)

    KafkaRDD.collect().foreach(println)

    ReplyDelete

Post a Comment

Popular posts from this blog

Hive UDF Example

Custom UDF in Apache Spark

Enterprise Kafka and Spark : Kerberos based Integration