java - Turn multiple Cassandra partitions into Akka stream -


i'm trying develop source turns set of cassandra partitions akka stream (i execute multiple cassandra queries through bound statement, 1 each partition , mapconcat result iterable of dtos):

source<result<journalentry>, notused> src = source.unfoldasync(querybag, s -> {      boundstatement bound = getboundstatement(querybag);      // out data cassandra:     future<result<journalentry>> page =          future.apply(() -> journalmapper.map(session.execute(bound)), ec);      future<optional<pair<querybag,result<journalentry>>>> next = page.map(r -> {         optional<pair<querybag,result<journalentry>>> opt;         if (r.isexhausted()) {             opt = optional.empty();         } else {             opt = optional.of(pair.apply(querybag.incrementpartitionid(), r));         }         return opt;     }, ec);     return futureconverters.tojava(next); });  source<journalentry, notused> concat = src.mapconcat(res -> () -> res.iterator());  // materialize concat , consume stream 

is safe practice? in particular, i'm concerned generating iterable through mapconcat((res) -> () -> res.iterator()), multiple calls of iterator() method of resulting iterable same instance of iterator.

besides, sensible approach iterate across different cassandra partitions or more appropriate use graphstage?


Comments

Popular posts from this blog

qt - QML MouseArea onWheel event not working properly when inside QML Scrollview -

java - is not an enclosing class / new Intent Cannot Resolve Constructor -

python - Error importing VideoFileClip from moviepy : AttributeError: 'PermissionError' object has no attribute 'message' -