Sunday 15 April 2012

parallel processing - How to process chuncks of a file with java.util.stream -



parallel processing - How to process chuncks of a file with java.util.stream -

to familliar stream api, tried code quite simple pattern.

problem: having text file containing not nested blocks of text. blocks identified start/endpatterns (e.g. <start> , <stop>. content of block isn't syntactically distinguishable noise between blocks. hence impossible, work simple (stateless) lambdas.

i able implement ugly like: files.lines(path).collect(new mysequentialparseandprosesseachlinecollector<>()); honest, not want.

im looking mapper like: files.lines(path).map(mymapalllinesofblocktobuckets()).parallelstream().collect(new myprocessonebucketcollector<>());

is there way extract chunks of info java 8 stream seems contain skeleton of solution. unfortunatly, i'm stubid translate problem. ;-)

any hints?

here solution can used converting stream<string>, each element representing line, stream<list<string>>, each element representing chunk found using specified delimiter:

public class chunkspliterator implements spliterator<list<string>> { private final spliterator<string> source; private final predicate<string> start, end; private final consumer<string> getchunk; private list<string> current; chunkspliterator(spliterator<string> linespliterator, predicate<string> chunkstart, predicate<string> chunkend) { source=linespliterator; start=chunkstart; end=chunkend; getchunk=s -> { if(current!=null) current.add(s); else if(start.test(s)) current=new arraylist<>(); }; } public boolean tryadvance(consumer<? super list<string>> action) { while(current==null || current.isempty() || !end.test(current.get(current.size()-1))) if(!source.tryadvance(getchunk)) homecoming false; current.remove(current.size()-1); action.accept(current); current=null; homecoming true; } public spliterator<list<string>> trysplit() { homecoming null; } public long estimatesize() { homecoming long.max_value; } public int characteristics() { homecoming ordered|nonnull; } public static stream<list<string>> tochunks(stream<string> lines, predicate<string> chunkstart, predicate<string> chunkend, boolean parallel) { homecoming streamsupport.stream( new chunkspliterator(lines.spliterator(), chunkstart, chunkend), parallel); } }

the lines matching predicates not included in chunk; easy alter behavior, if desired.

it can used this:

chunkspliterator.tochunks( files.lines(paths.get(myfile)), pattern.compile("^<start>$").aspredicate(), pattern.compile("^<stop>$").aspredicate(), true ) .collect(new myprocessonebucketcollector<>())

the patterns specifying ^word$ require entire line consist of word only; without these anchors, lines containing pattern can start , end chunk. nature of source stream not allow parallelism when creating chunks, when chaining immediate collection operation parallelism entire operation rather limited. depends on myprocessonebucketcollector if there can parallelism @ all.

if final result not depend on order of occurrences of buckets in source file, recommended either collector reports unordered or insert unordered() in stream’s method chains before collect.

java parallel-processing java-8 java-stream

No comments:

Post a Comment