Java - Stream Processing

Java Conceptuel Diagram

Map objects to another value as specified by a Function object Streammap(Function<? super T,? extends R> mapper) |
Perform an action as specified by a Consumer object void forEach(Consumer<? super T> action)
members
    .stream()
    .filter(
        p -> p.getGender() == Person.Sex.MALE
            && p.getAge() >= 18
            && p.getAge() <= 25)
    .map(p -> p.getEmailAddress())
    .forEach(email -> System.out.println(email));
Aggregate operations process elements from a stream, not directly from a collection but from a stream (pipeline)
  • filter,
  • map,
  • forEach
Aggregate operations accept lambda expressions as parameters.
Aggregate vs Iterator
Fundamental differences between Aggregate operations (like forEach) and iterators.
Parallelism
Aggregate operations:
  • determines what collection it iterates
  • and delegates how to iterate the collection to the JDK (internal iteration)
That's why aggregate operations do not contain a method like next to instruct them to process the next element of the collection (called an external iteration, when the application determines both what collection it iterates and how it iterates it). External iteration can only iterate over the elements of a collection sequentially. Internal iteration can iterate in parallel and not sequentially by implementing a map-reduce paradigmSee Parallelism
Stream
They process elements from a stream: Aggregate operations process elements from a stream, not directly from a collection. Consequently, they are also called stream operations.
Behavior
They support behavior as parameters: You can specify lambda expressions as parameters for most aggregate operations. This enables you to customize the behavior of a particular aggregate operation.
Pipeline
Data Processing - Data Flow (ETL | Workflow | Pipeline)A pipeline is a sequence of stream operations (ie aggregate operations) A stream is a sequence of elements. Unlike a collection, it is not a data structure that stores elements. Instead, a stream carries values from a source through a pipeline.A stream is created by invoking the method stream.In a pipeline, an intermediate operation, such as filter, produces a new stream.A pipeline contains the following components:
  • A source: This could be :
    • a collection,
    • an array,
    • a generator function,
    • or an I/O channel.
  • Zero or more intermediate operations. An intermediate operation, such as filter, produces a new stream.
  • A terminal operation. A terminal operation, such as forEach, produces a non-stream result, such as:
    • a primitive value (like a double value), (such as average, sum, min, max, and count) - return one value by combining the contents of a stream - These operations are called reduction operations (Reduction operation doc
    • a collection,
    • or in the case of forEach, no value at all.
The Java runtime and compiler infer that the type of the lambda arguments.
Example
https://docs.oracle.com/javase/tutorial/collections/streams/examples/BulkDataOperationsExamples.java
Average
average age of all male members
double average = members
    .stream() // create a stream of members
    .filter(p -> p.getGender() == Person.Sex.MALE) // return a stream of male person
    .mapToInt(Person::getAge) // return the age of each mal members in a new stream
    .average() // average them
    .getAsDouble(); // return it as double
Operation
Peek
PeekFor debugging or logging purpose (stateful therefore, ie the log pointer should be known)method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline
 Stream.of(1, 2, 3, 4)
      .filter(e -> e > 2)
      .peek(e -> System.out.println("Peek: Value after the Filter stage: " + e))
      .map(e -> e + 1)
      .peek(e -> System.out.println("Peek: Value after the Mapped value: " + e))
      .collect(Collectors.toList());
Peek: Value after the Filter stage: 3
Peek: Value after the Mapped value: 4
Peek: Value after the Filter stage: 4
Peek: Value after the Mapped value: 5

Reduce
General-purpose reduction operations: ExampleExample: sum implementations.
Integer totalAgeReduce = roster
   .stream()
   .map(Person::getAge)
   .reduce(
       0,
       (a, b) -> a + b
   );
where:
  • 0 is the value of the identity argument. It is both:
    • the initial value of the reduction
    • and the default result if there are no elements in the stream.
  • (a, b) → a + b is a lambda expression that defines the accumulator function where
    • a is the partial value of all processed elements so far (In this case, a sum)
    • b is the next element of the stream
    • and a+b is the operation that returns a partial result
The reduce method creates always a new value when it processes an element. Which means that when the reduce operation is just to add elements to a collection, the accumulator function will:
  • process an element,
  • creates a new collection
  • and add the elements to the collection,
every time which is particularly inefficient. The Stream.collect method solves this problem because it updates an existing collection instead.
Collect
Like the reduce method and all other reduction methods, the collect method returns only one value. https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html#collectExample:
  • Return a list
stream
.map(Obj:FunctionThatReturnAList)
.collect(ArrayList::new, List::addAll, List::addAll);


List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,ArrayList::addAll);
Interface
Function
Compose
Function<String, String> helloOp = (s) -> "Hello " + s;
    Function<String, String> shoutedOp = (s) ->  s + " !!!";
    Function<String, String> helloShoutedOp = shoutedOp.compose(helloOp);
    String helloWorldShouted = helloShoutedOp.apply("World");
    System.out.println(helloWorldShouted);
Hello World !!!

Type
Infinite
generating functions, The only way to make such a stream finite, is to chain a limit operation to the stream.
Finite
If you want to create a non-empty finite stream that is not backed by an array or collection and none of the existing stream sources fits, you have to implement your own Spliterator and create a stream out of it
Stateful vs stateless
Stateful means that the function that implements the interface have local variables that records the stream that passed by.Collector and Consumer are two lambda interfaces that by definition are stateful.
Operation Type Why
collect(Collector) Stateful collect all the elements in a collection (state)
forEach(Consumer) Stateful the Consumer is by definition stateful, well except if it's a black hole (no-op)
peek(Consumer) Stateful The Consumer is by definition stateful, because why peek if not to store it somewhere (e.g. log)
distinct Stateful
Predicate, Function, UnaryOperator, BinaryOperator, and Comparator Stateless
Implementation
The standard way of implementing a stream, is to implement: In either case, the implementation has a way to report an end when:
  • Spliterator.tryAdvance returns false or its forEachRemaining method just returns,
  • or in case of an Iterator source, when hasNext() returns false.
You can use an existing method to create a Spliterator out of an Iterator, but you should resists the temptation to use an Iterator just because it’s familiar. A Spliterator is not hard to implement.Ref
Execution
The “laziness” of Streams means that they start executing only when they encounter the terminal (final) operation.
Close
try (IntStream stream = IntStream.range(1,10)) {
      stream.onClose(() -> System.out.println("--")).forEach(x -> System.out.println(x));
}





Discover More
Event Centric Thinking
(Stream|Pipe|Message Queue|Event Processing)

From an abstract point of view, a stream is a sequence of aninfinite cardinality (size) delivered at unknown time intervals. list Streams: are inputs and outputs of operations may be also buffers...
Imperative Vs Functional
Functional Programming - Sum

The addition (sum) function in a Functional programming context From a collection of object that return an exit status Where myMao is map where the values are integers
Java Conceptuel Diagram
Java - (Enumerable|Iterator) Data Type (Iterable interface)

The (Enumeration|Iterator) interface defines the methods by which you can: iterate enumerate obtain one at a time the elements of a collection. The Enumerator Interface: has been deprecated...
Java Conceptuel Diagram
Java - Array

in Java. Arrays implement most of the List operations, but not remove and add. They are “fixed-size” Lists. An array is a container object that holds : a fixed number of values of a single...
Java Conceptuel Diagram
Java - Boolean

in Java if all element are even
Java Conceptuel Diagram
Java - Integer

integer data type in Java. int (or Integer) is the 32 bit implementation of an integer. Java can also store an integer on 64 bit with a long. An integer in java is a sub-class of number They are...
Java Conceptuel Diagram
Java - List (sequence)

A list (also known as a sequence): is ordered allow duplicates (multiple null elements if they allow them) is zero based. The first element is at the position 0. can contain themselves as elements...
Java Conceptuel Diagram
Java - Map

Map is a data structure implementation of the java collection framework. See java/util/MapMap hashmap. Permits null values and the null key. (The HashMap class is roughly equivalent to Hashtable,...
Java Conceptuel Diagram
Java - Sequence

From a stream of integer (ie a java/util/stream/IntStreamIntStream) 5 questions mark
Java Conceptuel Diagram
Java - Set (Collection)

This interface is a member of the Java Collections Framework. Set is a collection that cannot contain duplicate elements. This interface models the mathematical set abstraction and is used to represent...



Share this page:
Follow us:
Task Runner