更新时间:2019年07月29日 15时44分32秒 来源:黑马程序员论坛
Spark 从 Kafka 读数并发问题 经常使用 Apache Spark从Kafka读数的同学肯定会遇到这样的问题:某些Spark分区已经处理完数据了,另一部分分区还在处理数据,从而导致这个批次的作业总消耗时间变长;甚至导致 Spark 作业无法及时消费Kafka中的数据。为了简便起见,本文讨论的 Spark Direct 方式读取Kafka中的数据,这种情况下 Spark RDD 中分区和 Kafka 分区是一一对应的,更多的细节请参见官方文档,这里就不介绍。那么有没有办法解决这个问题呢?我们先来看看社区是咋解决这个问题。 也就是修改了 KafkaRDD 类的 getPartitions 方法: 原实现: [Scala] 纯文本查看 复制代码 override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray } 修改后的实现: [Scala] 纯文本查看 复制代码 override def getPartitions: Array[Partition] = { val subconcurrency = if (kafkaParams.contains("topic.partition.subconcurrency")) kafkaParams.getOrElse("topic.partition.subconcurrency","1").toInt else 1 val numPartitions = offsetRanges.length val subOffsetRanges: Array[OffsetRange] = new Array[OffsetRange](subconcurrency * numPartitions) for (i <- 0 until numPartitions) { val offsetRange = offsetRanges(i) val step = (offsetRange.untilOffset - offsetRange.fromOffset) / subconcurrency var from = -1L var until = -1L for (j <- 0 until subconcurrency) { from = offsetRange.fromOffset + j * step until = offsetRange.fromOffset + (j + 1) * step -1 if (j == subconcurrency) { until = offsetRange.untilOffset } subOffsetRanges(i * subconcurrency + j) = OffsetRange.create(offsetRange.topic, offsetRange.partition, from, until) } } subOffsetRanges.zipWithIndex.map{ case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray } 这个方法的实现思想还是很简单的,就是通过设置 topic.partition.subconcurrency 参数,如果这个参数等于1,整个函数的执行效果和之前一样。但是如果这个参数大于1,则之前一个 Kafka 分区由一个 Spark 分区消费的数据变成由 topic.partition.subconcurrency 个 Spark 分区去消费,每个 Spark 分区消费的数据量相等。这个无疑会加快 Kafka 数据的消费,但是这种方法的问题也很明显:
到目前为止,上述 PR 被关闭,而且Spark-22056 一直处于 IN PROGRESS 状态,我猜这个最后可能也会被关闭掉。那除了上面实现,我们还有其他实现吗?当然有,我们可以在处理数据之前通过 repartition 或 coalease 对数据进行重分区: [Scala] 纯文本查看 复制代码 val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics).repartition(xxx).mapPartition(xxx) 这种方法的好处是,对同一类型的数据,先后顺序是不会乱的,因为同一类型的数据经过重分区最后还是会分发到同一个分区里面的。但是这个方法的使用前提是数据重分区+后续处理的时间比没有重分区直接处理数据的时间要短,否则重分区的开销过大导致总的处理时间过长那就没意义了。 当然,我们可以可以通过在 RDD#mapPartitions 里面创建多个线程来处理同一个 RDD 分区里面的数据。但是上面两种方法无法解决 Kafka 端数据倾斜导致的数据处理过慢的问题(也就是有些分区数据量相比其他分区大很多,光是从这些分区消费数据的时间就比其他分区要长很多)。针对这种情况,我们需要考虑 Kafka 分区设置是否合理?是否能够通过修改 Kafka 分区的实现来解决数据倾斜的问题。 如果不是 Kafka 数据倾斜导致的数据处理过慢的问题,而是所有 Kafka 分区的整体数据量就比较大,那这种情况我们可以考虑是否可以增加 Kafka 分区数?是否需要增加 Spark 的处理资源等。建议最好还是别使用多个线程处理同一个 Kafka 分区里面的数据。 转载自过往记忆(https://www.iteblog.com/) |
推荐了解热门学科
java培训 | Python人工智能 | Web前端培训 | PHP培训 |
区块链培训 | 影视制作培训 | C++培训 | 产品经理培训 |
UI设计培训 | 新媒体培训 | 软件测试培训 | Linux运维 |
大数据培训 | 智能机器人软件开发 |
传智播客是一家致力于培养高素质软件开发人才的科技公司,“黑马程序员”是传智播客旗下高端IT教育品牌。自“黑马程序员”成立以来,教学研发团队一直致力于打造精品课程资源,不断在产、学、研3个层面创新自己的执教理念与教学方针,并集中“黑马程序员”的优势力量,针对性地出版了计算机系列教材50多册,制作教学视频数+套,发表各类技术文章数百篇。
传智播客从未停止思考
传智播客副总裁毕向东在2019IT培训行业变革大会提到,“传智播客意识到企业的用人需求已经从初级程序员升级到中高级程序员,具备多领域、多行业项目经验的人才成为企业用人的首选。”
中级程序员和初级程序员的差别在哪里?
项目经验。毕向东表示,“中级程序员和初级程序员最大的差别在于中级程序员比初级程序员多了三四年的工作经验,从而多出了更多的项目经验。“为此,传智播客研究院引进曾在知名IT企业如阿里、IBM就职的高级技术专家,集中研发面向中高级程序员的课程,用以满足企业用人需求,尽快补全IT行业所需的人才缺口。
何为中高级程序员课程?
传智播客进行了定义。中高级程序员课程,是在当前主流的初级程序员课程的基础上,增加多领域多行业的含金量项目,从技术的广度和深度上进行拓展。“我们希望用5年的时间,打造上百个高含金量的项目,覆盖主流的32个行业。”传智播客课程研发总监于洋表示。
黑马程序员热门视频教程
Python入门教程完整版(懂中文就能学会) | 零起点打开Java世界的大门 |
C++| 匠心之作 从0到1入门学编程 | PHP|零基础入门开发者编程核心技术 |
Web前端入门教程_Web前端html+css+JavaScript | 软件测试入门到精通 |