import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.PairRDDFunctions val sc: SparkContext = ... val data = ... // an RDD[(K, V)] of any key value pairs val fractions: Map[K, Double] = ... // specify the exact fraction desired from each key // Get an exact sample from each stratum val approxSample = data.sampleByKey(withReplacement = false, fractions) val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)
defgetBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)], fractions: Map[K, Double], exact: Boolean, seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = { var samplingRateByKey = fractions (idx: Int, iter: Iterator[(K, V)]) => { //初始化随机生成器 val rng = newRandomDataGenerator() rng.reSeed(seed + idx) // Must use the same invoke pattern on the rng as in getSeqOp for without replacement // in order to generate the same sequence of random numbers when creating the sample iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1)) } }
val counts = Some(rdd.countByKey()) //计算立即接受的样本数量,并且为每层生成候选名单 val finalResult = getAcceptanceResults(rdd, true, fractions, counts, seed) //决定接受样本的阈值,生成准确的样本大小 val thresholdByKey = computeThresholdByKey(finalResult, fractions) (idx: Int, iter: Iterator[(K, V)]) => { val rng = newRandomDataGenerator() rng.reSeed(seed + idx) iter.flatMap { item => val key = item._1 val acceptBound = finalResult(key).acceptBound // Must use the same invoke pattern on the rng as in getSeqOp for with replacement // in order to generate the same sequence of random numbers when creating the sample val copiesAccepted = if (acceptBound == 0) 0L else rng.nextPoisson(acceptBound) //候选名单 val copiesWaitlisted = rng.nextPoisson(finalResult(key).waitListBound) val copiesInSample = copiesAccepted + (0 until copiesWaitlisted).count(i => rng.nextUniform() < thresholdByKey(key)) if (copiesInSample > 0) { Iterator.fill(copiesInSample.toInt)(item) } else { Iterator.empty } } }
伯努利抽样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
defgetBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)], fractions: Map[K, Double], exact: Boolean, seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = { var samplingRateByKey = fractions //计算立即接受的样本数量,并且为每层生成候选名单 val finalResult = getAcceptanceResults(rdd, false, fractions, None, seed) //决定接受样本的阈值,生成准确的样本大小 samplingRateByKey = computeThresholdByKey(finalResult, fractions) (idx: Int, iter: Iterator[(K, V)]) => { val rng = newRandomDataGenerator() rng.reSeed(seed + idx) // Must use the same invoke pattern on the rng as in getSeqOp for without replacement // in order to generate the same sequence of random numbers when creating the sample iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1)) } }