java - Turn multiple Cassandra partitions into Akka stream -
i'm trying develop source
turns set of cassandra partitions akka stream (i execute multiple cassandra queries through bound statement, 1 each partition , mapconcat result iterable of dtos):
source<result<journalentry>, notused> src = source.unfoldasync(querybag, s -> { boundstatement bound = getboundstatement(querybag); // out data cassandra: future<result<journalentry>> page = future.apply(() -> journalmapper.map(session.execute(bound)), ec); future<optional<pair<querybag,result<journalentry>>>> next = page.map(r -> { optional<pair<querybag,result<journalentry>>> opt; if (r.isexhausted()) { opt = optional.empty(); } else { opt = optional.of(pair.apply(querybag.incrementpartitionid(), r)); } return opt; }, ec); return futureconverters.tojava(next); }); source<journalentry, notused> concat = src.mapconcat(res -> () -> res.iterator()); // materialize concat , consume stream
is safe practice? in particular, i'm concerned generating iterable through mapconcat((res) -> () -> res.iterator())
, multiple calls of iterator()
method of resulting iterable
same instance of iterator.
besides, sensible approach iterate across different cassandra partitions or more appropriate use graphstage
?
Comments
Post a Comment