java - Understanding Spark's closures and their serialization -
disclaimer: starting play spark.
i'm having troubles understanding famous "task not serializable" exception question little different see on (or think).
i have tiny custom rdd (testrdd
). has field stores objects class not implement serializable (nonserializable
). i've set "spark.serializer" config option use kryo. however, when try count()
on rdd, following:
caused by: java.io.notserializableexception: com.complexible.spark.nonserializable serialization stack: - object not serializable (class: com.test.spark.nonserializable, value: com.test.spark.nonserializable@2901e052) - field (class: com.test.spark.testrdd, name: mns, type: class com.test.spark.nonserializable) - object (class com.test.spark.testrdd, testrdd[1] @ rdd @ testrdd.java:28) - field (class: scala.tuple2, name: _1, type: class java.lang.object) - object (class scala.tuple2, (testrdd[1] @ rdd @ testrdd.java:28,<function2>)) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:46) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:100) @ org.apache.spark.scheduler.dagscheduler.submitmissingtasks(dagscheduler.scala:1009) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$submitstage(dagscheduler.scala:933)
when inside dagscheduler.submitmissingtasks
see uses closure serializer on rdd, java serializer, not kryo serializer i'd expect. i've read kryo has issues serializing closures , spark uses java serializer closures don't quite understand how closures come play here @ all. i'm doing here this:
sparkconf conf = new sparkconf() .setappname("scantest") .setmaster("local") .set("spark.serializer", "org.apache.spark.serializer.kryoserializer"); javasparkcontext sc = new javasparkcontext(conf); testrdd rdd = new testrdd(sc.sc()); system.err.println(rdd.count());
that is, no mappers or require serialization of closures. otoh works:
sc.parallelize(arrays.aslist(new nonserializable(), new nonserializable())).count()
the kryo serializer used expected, closure serializer not involved. if didn't set serializer property kryo, i'd exception here well.
i appreciate pointers explaining closure comes , how ensure can use kryo serialize custom rdds.
update: here's testrdd
non-serializable field mns
:
class testrdd extends rdd<string> { private static final classtag<string> string_tag = classmanifestfactory$.module$.fromclass(string.class); nonserializable mns = new nonserializable(); public testrdd(final sparkcontext _sc) { super(_sc, javaconversions.asscalabuffer(collections.<dependency<?>>emptylist()), string_tag); } @override public iterator<string> compute(final partition thepartition, final taskcontext thetaskcontext) { return javaconverters.asscalaiteratorconverter(arrays.aslist("test_" + thepartition.index(), "test_" + thepartition.index(), "test_" + thepartition.index()).iterator()).asscala(); } @override public partition[] getpartitions() { return new partition[] {new testpartition(0), new testpartition(1), new testpartition(2)}; } static class testpartition implements partition { final int mindex; public testpartition(final int theindex) { mindex = theindex; } public int index() { return mindex; } } }
when inside
dagscheduler.submitmissingtasks
see uses closure serializer on rdd, java serializer, not kryo serializer i'd expect.
sparkenv
supports 2 serializers, 1 named serializer
used serialization of data, checkpointing, messaging between workers, etc , available under spark.serializer
configuration flag. other called closureserializer
under spark.closure.serializer
used check object in fact serializable , configurable spark <= 1.6.2 (but nothing other javaserializer
works) , hardcoded 2.0.0 , above javaserializer
.
the kryo closure serializer has bug make unusable, can see bug under spark-7708 (this may fixed kryo 3.0.0, spark fixed specific version of chill fixed on kryo 2.2.1). further, spark 2.0.x javaserializer fixed instead of configurable (you can see in pull request). mean we're stuck javaserializer
closure serialization.
is weird we're using 1 serializer submit tasks , other serialize data between workers , such? definitely, have.
to sum up, if you're setting spark.serializer
configuration, or using sparkcontext.registerkryoclasses
you'll utilizing kryo of serialization in spark. having said that, checking if given class serializable , serialization of tasks workers, spark use javaserializer
.
Comments
Post a Comment