parallel processing - How to process chuncks of a file with java.util.stream -
to familliar stream api, tried code quite simple pattern.
problem: having text file containing not nested blocks of text. blocks identified start/endpatterns (e.g. <start>
, <stop>
. content of block isn't syntactically distinguishable noise between blocks. hence impossible, work simple (stateless) lambdas.
i able implement ugly like: files.lines(path).collect(new mysequentialparseandprosesseachlinecollector<>());
honest, not want.
im looking mapper like: files.lines(path).map(mymapalllinesofblocktobuckets()).parallelstream().collect(new myprocessonebucketcollector<>());
is there way extract chunks of info java 8 stream seems contain skeleton of solution. unfortunatly, i'm stubid translate problem. ;-)
any hints?
here solution can used converting stream<string>
, each element representing line, stream<list<string>>
, each element representing chunk found using specified delimiter:
public class chunkspliterator implements spliterator<list<string>> { private final spliterator<string> source; private final predicate<string> start, end; private final consumer<string> getchunk; private list<string> current; chunkspliterator(spliterator<string> linespliterator, predicate<string> chunkstart, predicate<string> chunkend) { source=linespliterator; start=chunkstart; end=chunkend; getchunk=s -> { if(current!=null) current.add(s); else if(start.test(s)) current=new arraylist<>(); }; } public boolean tryadvance(consumer<? super list<string>> action) { while(current==null || current.isempty() || !end.test(current.get(current.size()-1))) if(!source.tryadvance(getchunk)) homecoming false; current.remove(current.size()-1); action.accept(current); current=null; homecoming true; } public spliterator<list<string>> trysplit() { homecoming null; } public long estimatesize() { homecoming long.max_value; } public int characteristics() { homecoming ordered|nonnull; } public static stream<list<string>> tochunks(stream<string> lines, predicate<string> chunkstart, predicate<string> chunkend, boolean parallel) { homecoming streamsupport.stream( new chunkspliterator(lines.spliterator(), chunkstart, chunkend), parallel); } }
the lines matching predicates not included in chunk; easy alter behavior, if desired.
it can used this:
chunkspliterator.tochunks( files.lines(paths.get(myfile)), pattern.compile("^<start>$").aspredicate(), pattern.compile("^<stop>$").aspredicate(), true ) .collect(new myprocessonebucketcollector<>())
the patterns specifying ^word$
require entire line consist of word only; without these anchors, lines containing pattern can start , end chunk. nature of source stream not allow parallelism when creating chunks, when chaining immediate collection operation parallelism entire operation rather limited. depends on myprocessonebucketcollector
if there can parallelism @ all.
if final result not depend on order of occurrences of buckets in source file, recommended either collector reports unordered
or insert unordered()
in stream’s method chains before collect
.
java parallel-processing java-8 java-stream
No comments:
Post a Comment