Kafka Saprk Integration Issue - Task Not Serializable

When you Integrate Kafka with spark streaming. There are some very small things that needs to be taken care. These things are often ignored and we end up wasting lot of time in resolving issues which occur because of this. As part of this post , i will discuss these small small issues with you.

1. Task Not Serializable: If you have tried posting messages from spark streaming to kafka, there is very high probability that you will face this issues. Let us understand the problem first.
  when does it occur - when you create a object of a class and try to use it in any of transformation logic for dstream or rdd.
 Example - example code is below. If KafkaSink class is not declared as serializable, and you are using it as below, You will definitely face Task not serializable issue. 

kafkaStream.foreachRDD{ rdd => rdd.foreachPartition{ 
            part => part.foreach(msg => kafkaSink.value.send("kafka_out_topic",msg.toSt                    ring)
            }

   Reason - When spark execute any task, it will serialize all objects related to that task, so that it can send it to different executors. This also happens when data transfer happen between different executors.

   Solution - Make classes that are supposed to be used on executors , serializable.

2. Task Not Serializable for streamingContext: Imagine a case , when you receive input from a stream using sparkStreaming. Then you do some processing using that stream on executors and then post result to a kafka queue. Then make sure you have made sparkStreaming context as transient variable. As part of this you are telling spark that streaming context should be ignored whenever , it tries to serialize a job and send it to any executor.

3. Task Not Serializable for dstream: In a similar situation as point 2. if you are creating multiple variables from dstream and sending the last variable to executor. then all variables used should be declared as transient.

@transient val kafkaStream = KafkaUtils.createStream(streamingContext,"zookpr-host:2181","kafka-grp", Map("Kafka_input" -> 1.toInt))

val words = dstream.flatMap(x => x.split(" ")) val wc = words.map(x => (x, 1)).reduceByKey(_ + _)


you will face this issue if words and wc is not transient. so following code is right.


@transient val kafkaStream = KafkaUtils.createStream(streamingContext,"zookpr-host:2181","kafka-grp", Map("Kafka_input" -> 1.toInt))
 
@transient val words = dstream.flatMap(x => x.split(" "))
@transient val wc = words.map(x => (x, 1)).reduceByKey(_ + _)

I hope this post will be useful for you. please share your thoughts and comments. You can access code related to this at my github

Comments

Popular posts from this blog

Hive UDF Example

Custom UDF in Apache Spark

Enterprise Kafka and Spark : Kerberos based Integration