scala - What is the right way to have a static object on all workers -
i've been looking @ documentation spark , mentions this:
spark’s api relies heavily on passing functions in driver programme run on cluster. there 2 recommended ways this:
anonymous function syntax, can used short pieces of code. static methods in global singleton object. example, can define object myfunctions , pass myfunctions.func1, follows:
object myfunctions { def func1(s: string): string = { ... } } myrdd.map(myfunctions.func1)
note while possible pass reference method in class instance (as opposed singleton object), requires sending object contains class along method. example, consider:
class myclass { def func1(s: string): string = { ... } def dostuff(rdd: rdd[string]): rdd[string] = { rdd.map(func1) } }
here, if create new myclass , phone call dostuff on it, map within there references func1 method of myclass instance, whole object needs sent cluster. similar writing rdd.map(x => this.func1(x))
.
now uncertainty happens if have attributes on singleton object (which supposed equivalent static). same illustration little alteration:
object myclass { val value = 1 def func1(s: string): string = { s + value } } myrdd.map(myclass.func1)
so function still referenced statically, how far spark goes trying serialize referenced variables? serialize value
or initialized 1 time again in remote workers?
additionally, in context have heavy models within singleton object , find right way serialize them workers while keeping ability reference them singleton everywhere, instead of passing them around function parameters across pretty deep function phone call stack.
any in-depth info on what/how/when spark serialize things appreciated.
this less question spark , more of question of how scala generates code. remember scala object
pretty much java class total of static methods. consider simple illustration this:
object foo { val value = 42 def func(i: int): int = + value def main(args: array[string]): unit = { println(seq(1, 2, 3).map(func).sum) } }
that translated 3 java classes; 1 of them closure parameter map
method. using javap
on class yields this:
public final class foo$$anonfun$main$1 extends scala.runtime.abstractfunction1$mcii$sp implements scala.serializable { public static final long serialversionuid; public final int apply(int); public int apply$mcii$sp(int); public final java.lang.object apply(java.lang.object); public foo$$anonfun$main$1(); }
note there no fields or anything. if @ disassembled bytecode, phone call func()
method. when running in spark, instance serialized; since has no fields, there's not much serialized.
as question, how initialize static objects, can have idempotent initialization function phone call @ start of closures. first 1 trigger initialization, subsequent calls no-ops. cleanup, though, lot trickier, since i'm not familiar api "run code on executors".
one approach can useful if need cleanup explained in blog, in "setup() , cleanup()" section.
edit: clarification, here's disassembly of method makes call.
public int apply$mcii$sp(int); code: 0: getstatic #29; //field foo$.module$:lfoo$; 3: iload_1 4: invokevirtual #32; //method foo$.func:(i)i 7: ireturn
see how references static field holding singleton , calls func()
method.
scala apache-spark
No comments:
Post a Comment