spark streaming - The missing records of Kafka consumer -
there problem between kafka , spark-streaming, have low-level traffic (around 12000 - 15000 records / per second) services in production, @ first, consuming traffic seems normal, after 10 - 15 mins, speed of consuming 1/10 left. might network's traffic problem?
configurations of kafka: num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false log.cleanup.interval.mins=1
configurations of spark-streaming (consumer):
.... val kafkaparams = map( "zookeeper.connect" -> zkquorum, "group.id" -> group, "zookeeper.connection.timeout.ms" -> "1000000", "zookeeper.sync.time.ms" -> "200", "fetch.message.max.bytes" -> "2097152000", "queued.max.message.chunks" -> "1000", "auto.commit.enable" -> "true", "auto.commit.interval.ms" -> "1000") seek { kafkautils.createstream[string, string, stringdecoder, stringdecoder]( ssc, kafkaparams, topics.map((_, partition)).tomap, storagelevel.memory_only).map { case (key, value) => convertto(key, value) }.filter { _ != null }.foreachrdd(line => savetohbase(line, input_table)) //}.foreachrdd(line => logger.info("handling testing....."+ line)) } grab { case e: exception => logger.error("consumerex: " + e.printstacktrace) }
it might gc pause time, maybe. check this: http://ingest.tips/2015/01/21/handling-large-messages-kafka/
apache-kafka spark-streaming
No comments:
Post a Comment