Wednesday 15 January 2014

scala - How to use cache() correct? -



scala - How to use cache() correct? -

i'm using spark 1.1.0 , seek load graph graphx. fraction of code looks following:

class="lang-scala prettyprint-override">val distinct = context.union(r1, r2).distinct; distinct.cache() val zipped = distinct.zipwithuniqueid zipped.cache distinct.unpersist(false)

when i'm executing on cluster first stage executed is:

class="lang-scala prettyprint-override">distinct @ test.scala:72

but after operation has finished can't see entry in "storage" tab of spark ui. next stage is:

class="lang-scala prettyprint-override">zipwithuniqueid @ test.scala:78

but rigth after starts again:

class="lang-scala prettyprint-override">distinct @ test.scala:72

shouldn't result cached? useful cache rdd if used once?

edit:

i forgot mention fetch failure @ zipwithuniqueid @ test.scala:78

possible solution fetch-problem

possible solutions described here bug in spark version 1.1.0.

also possible solutions andrew ash spark-user mailing list:

there seem 3 things causing fetchfailures in 1.1:

1) long gcs on executor (longer spark.core.connection.ack.wait.timeout default 60sec)

2) many files open (hit kernel limits on ulimit -n)

3) undetermined issue beingness tracked on ticket

source

cache applied first time rdd evaluated. means that, effective, cache should precede action produces rdd utilize more once. given cache applied on rdd evaluation, if have linear rdd lineage executed once, cache occupy memory without delivering advantage.

so, if pipeline is:

val distinct = context.union(r1, r2).distinct; val zipped = distinct.zipwithuniqueid zipped.cache

using cache between distinct , zipped not of use, unless need access distinct info 1 time again downwards road. giving you're unpersisting after, makes me think otherwise.

in nutshell, utilize .cache if evaluated rdd used more once. (eg. iterative algo, lookup, ...)

cache spark-shell example:

val rdd = sc.makerdd( 1 1000) val cached = rdd.cache // @ point, nil in console

cached.count // @ point, can see cached in console res0: long = 1000

val zipped = cached.zipwithuniqueid val zipcache = zipped.cache // 1 time again nil new on ui val zipcache.first // first action , trigger rdd evaluation

cached.unpersist(blocking=true) // forcefulness immediate unpersist

scala apache-spark

No comments:

Post a Comment