Thursday, 15 July 2010

scala - What is the right way to have a static object on all workers -



scala - What is the right way to have a static object on all workers -

i've been looking @ documentation spark , mentions this:

spark’s api relies heavily on passing functions in driver programme run on cluster. there 2 recommended ways this:

anonymous function syntax, can used short pieces of code. static methods in global singleton object. example, can define object myfunctions , pass myfunctions.func1, follows:

object myfunctions { def func1(s: string): string = { ... } } myrdd.map(myfunctions.func1)

note while possible pass reference method in class instance (as opposed singleton object), requires sending object contains class along method. example, consider:

class myclass { def func1(s: string): string = { ... } def dostuff(rdd: rdd[string]): rdd[string] = { rdd.map(func1) } }

here, if create new myclass , phone call dostuff on it, map within there references func1 method of myclass instance, whole object needs sent cluster. similar writing rdd.map(x => this.func1(x)).

now uncertainty happens if have attributes on singleton object (which supposed equivalent static). same illustration little alteration:

object myclass { val value = 1 def func1(s: string): string = { s + value } } myrdd.map(myclass.func1)

so function still referenced statically, how far spark goes trying serialize referenced variables? serialize value or initialized 1 time again in remote workers?

additionally, in context have heavy models within singleton object , find right way serialize them workers while keeping ability reference them singleton everywhere, instead of passing them around function parameters across pretty deep function phone call stack.

any in-depth info on what/how/when spark serialize things appreciated.

this less question spark , more of question of how scala generates code. remember scala object pretty much java class total of static methods. consider simple illustration this:

object foo { val value = 42 def func(i: int): int = + value def main(args: array[string]): unit = { println(seq(1, 2, 3).map(func).sum) } }

that translated 3 java classes; 1 of them closure parameter map method. using javap on class yields this:

public final class foo$$anonfun$main$1 extends scala.runtime.abstractfunction1$mcii$sp implements scala.serializable { public static final long serialversionuid; public final int apply(int); public int apply$mcii$sp(int); public final java.lang.object apply(java.lang.object); public foo$$anonfun$main$1(); }

note there no fields or anything. if @ disassembled bytecode, phone call func() method. when running in spark, instance serialized; since has no fields, there's not much serialized.

as question, how initialize static objects, can have idempotent initialization function phone call @ start of closures. first 1 trigger initialization, subsequent calls no-ops. cleanup, though, lot trickier, since i'm not familiar api "run code on executors".

one approach can useful if need cleanup explained in blog, in "setup() , cleanup()" section.

edit: clarification, here's disassembly of method makes call.

public int apply$mcii$sp(int); code: 0: getstatic #29; //field foo$.module$:lfoo$; 3: iload_1 4: invokevirtual #32; //method foo$.func:(i)i 7: ireturn

see how references static field holding singleton , calls func() method.

scala apache-spark

No comments:

Post a Comment