问题引出
python map 带for循环的状况 兴许 很easy! 但 spark 的 mapPartitions 晕了吗? 其实这两种是一样的场景。。。
注释试验(可疏忽,直奔后果)
为了不便,用把组装的数据类型灌入 map 来模仿 mapPartitions
性能是模拟计算 tf-idf
class Article: '''文章类''' def __init__(self,id, indexex, tfidfs): self.id = id self.indexex = indexex # 文章分词后的所有词索引列表 self.tfidfs = tfidfs # 每个词对应的TF-IDF值 列表 def f(partition): for row in partition: # row 代表每个文章 # row.indexex 代表 文章分词后的所有词索引列表 # row.tfidfs 代表 每个词对应的TF-IDF值 列表 word_list = list(zip(row.indexex, row.tfidfs)) for index, tfidf in word_list: # 遍历 "每个"词语 的 index与tfidf ########### 这里 yield 是重点 ########### yield f'文章{row.id}', index, tfidf c = map(f, [ # <-为了模仿分区,这一层的列表代表partition [ # <- 这一层模仿的是每个分区外面的文章列表 Article(0, [1,2],[0.1,0.4] ), # <-文章0 Article(1, [3,4],[3.4,3.7] ) # <-文章1 ] ] ) ######################## 执行 ######################## for x in c: # 解zip print(list(x)) # 解yield
后果:
如果应用 return 关键词,得出的最终打印后果: (不满足)
['文章0', 1, 0.1]
如果应用 yield 关键词,得出的最终打印后果: (满足)
[('文章0', 1, 0.1), ('文章0', 2, 0.4), ('文章1', 3, 3.4), ('文章1', 4, 3.7)]
这里就呈现了一个问题:
失常用法都是用 return,时罕用 lambda(lambda默认也是隐式 return。)
是何原因让咱们不得不用 yield?
一点一点往下推:
map: 外围是 "按单个数据映射" mapPartition: 外围是"把数据分组,按组映射" 按组映射是没错,但咱们的目标是想操作组内的每条数据。 所以咱们必须须要每次对组内数据 for循环遍历进去独自解决。 而后 返回回去。
那咱们先用失常的 return 返回试试:
def(partation): for x in partition: return x.name, x.age
兴许看到这里你感觉没什么问题。。。
然而不要忘了最根底的内容, return 是间接跳出 for 循环和函数的。
再次强调,mapPartition是按组映射,所以认真看下面代码:
最终的mapPartition是按组映射后果就是:
每组的第一个元素的汇合 (因为for被return了,每组的函数也被return了)
解决这种问题,有两种形式:
- 最简略将源代码for循环外部的 return 改为 yield
- 新建长期列表过渡,return放在for里面,如下案例:
def f(partition): _ = [] for x in partition: _.append(x) return _ b = [[1,2,3], [4,5,6]] c = map(f,b) print(list(c))
map + yield: