1、基本介紹

1、創(chuàng)建方式

1、Array的Stream創(chuàng)建
1、直接創(chuàng)建
// mainStream stream = Stream.of("a", "b", "c");
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);



// Stream.of()@SafeVarargs@SuppressWarnings("varargs") // Creating a stream from an array is safepublic static<T> Stream<T> of(T... values) {    return Arrays.stream(values);
}
2、直接使用Arrays.stream工具創(chuàng)建
// mainString [] strArray = new String[] {"a", "b", "c"};
stream = Arrays.stream(strArray);

下面是Arrays.stream的具體實現(xiàn)

// Arrays.stream()public static <T> Stream<T> stream(T[] array) {    return stream(array, 0, array.length);
}
/** * Arrays.stream() * @param startInclusive 起始坐標 * @param endExclusive 最終坐標  */public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

StreamSupport.stream的實現(xiàn)使用的是ReferencePipeline.Head<>這個方法,注意這個方法,這個方法是Stream流水線解決方案的核心之一

// StreamSupport.stream()public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

注意這里的Spliterator,這個類是Stream實現(xiàn)并行的核心類。這里Array生成的spliterator的特征值是ordered和immutable。(目前沒看到關(guān)于特征值的相關(guān)操作,具體解釋可以看源碼的注釋)

/**  * ReferencePipeline.Head<>() * 默認生成一個ordered、immutable的Spliterator */public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {    return Spliterators.spliterator(array, startInclusive, endExclusive,
                                    Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
2、Collection的Stream創(chuàng)建
// mainList<Integer> integers = new ArrayList<>();
integers.stream();

Collection的Stream的創(chuàng)建使用的是Collection.stream方法

// Collection.stream()default Stream<E> stream() {        return StreamSupport.stream(spliterator(), false);
    }

這個spliterator(),創(chuàng)建的是Spliterator下的特增值為sized、subSized的Spliterator

// Collection.spliterator()@Override
    default Spliterator<E> spliterator() {        return Spliterators.spliterator(this, 0);
    }
// Spliterators.spliterator()public static <T> Spliterator<T> spliterator(Collection<? extends T> c,                                                 int characteristics) {        return new IteratorSpliterator<>(Objects.requireNonNull(c),
                                         characteristics);
    }
/**  * IteratorSpliterator<>() * 默認生成一個sized、subSized的Spliterator */public IteratorSpliterator(Collection<? extends T> collection, int characteristics) {            this.collection = collection;            this.it = null;            this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
                                   ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
                                   : characteristics;
        }

最后還是將Spliterator放入ReferencePipeline.Head<>方法創(chuàng)建了Stream

// ReferencePipeline.Head<>public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
3、其他創(chuàng)建方式
1、Stream.iterate()
Stream.iterate(1,i->i++)

該方法放入一個seed值作為種子值,使用第二個參數(shù)方法生成一個無限大小的Stream,特征值與Array的Stream特征值相同。

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
        Objects.requireNonNull(f);        final Iterator<T> iterator = new Iterator<T>() {            @SuppressWarnings("unchecked")
            T t = (T) Streams.NONE;            @Override
            public boolean hasNext() {                return true;
            }            @Override
            public T next() {                return t = (t == Streams.NONE) ? seed : f.apply(t);
            }
        };        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
                iterator,
                Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
    }
2、Stream.generate()
Stream.generate(Math::random)

該方法沒有放入種子值,放入的是一個Supplier,該類就是java1.8以后加入的函數(shù)式接口,該接口只有一個方法就是get()方法,用于提供生成Stream需要的每一個的數(shù)據(jù),最后生成長度最大為9223372036854775807L(2的63次方-1)的Stream。

public static<T> Stream<T> generate(Supplier<T> s) {
        Objects.requireNonNull(s);        return StreamSupport.stream(                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }
@FunctionalInterfacepublic interface Supplier<T> {    /**     * Gets a result.     *     * @return a result     */
    T get();
}

2、中間操作(intermediate operation)

每一個流能有多個中間操作,中間操作的作用就是將原始的流轉(zhuǎn)化為需要的流,并且為惰性操作,關(guān)于惰性操作后面會有具體介紹。并且中間操作可分為有狀態(tài)和無狀態(tài)兩種,兩種不同的操作在構(gòu)成Stream流水線時會使用不同的創(chuàng)建方式和操作。

1、有狀態(tài)操作(statefulOp)

1、Stream<T> distinct();// 除去流種重復的元素

2、Stream<T> limit(long maxSize);// 只取前幾個元素

3、Stream<T> skip(long n);// 跳過前幾個元素

4、Stream<T> sorted();// 根據(jù)自然排序?qū)α髋判?/p>

5、Stream<T> sorted(Comparator<? super T> comparator);// 根據(jù)自己實現(xiàn)的排序?qū)α髋判?/p>

2、無狀態(tài)操作(statelessOp)

1、Stream<T> filter(Predicate<? super T> predicate);// 根據(jù)過濾規(guī)則過濾流種的元素

2、<R> Stream<R> map(Function<? super T, ? extends R> mapper);// 將每一個元素映射成另一個元素

3、<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);// 和上一個map映射不同的是,扁平映射會將流中的最基礎的元素映射出來

4、Stream<T> peek(Consumer<? super T> action);// 每一個元素都要做一下這個action

5、IntStream mapToInt(ToIntFunction<? super T> mapper);// 映射為IntStream

6、LongStream mapToLong(ToLongFunction<? super T> mapper);// 映射為LongStream

7、DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);// 映射為DoubleStream

2、結(jié)束操作(terminal operation)

每個流只能有一個結(jié)束操作,結(jié)束操作會將和之前的中間操作一同起作用,在使用了結(jié)束操作后該流便被消費掉了,不能再次使用。由于流可以是無限大的,所以也會有短路操作,當無限大的流使用了短路操作并且滿足了短路條件時便會直接結(jié)束。

1、非短路操作

1、void forEach(Consumer<? super T> action);// 每一個元素都要做一下這個aciton

2、void forEachOrdered(Consumer<? super T> action);// 確保并行時保持順序執(zhí)行這個action

3、Object[] toArray();// 轉(zhuǎn)化成Object數(shù)組

4、<A> A[] toArray(IntFunction<A[]> generator);// 轉(zhuǎn)化成自己定義的數(shù)組

5、T reduce(T identity, BinaryOperator<T> accumulator);// 匯聚,有起始值,操作

6、Optional<T> reduce(BinaryOperator<T> accumulator);// 匯聚,無起始值,返回的是Optional對象

7、<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> comb iner);// 匯聚,有起始值,操作,合并

8、<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner);// 可變匯聚,自己實現(xiàn)匯聚,容器、操作、合并操作

9、<R, A> R collect(Collector<? super T, A, R> collector);// 可變匯聚,Collectors有封裝工具

10、Optional<T> min(Comparator<? super T> comparator);// 封裝了reduce,使用自己的比較器找到最小的

11、Optional<T> max(Comparator<? super T> comparator);// 封裝了reduce,使用自己的比較器找到最大的

12、long count();// 封裝了reduce,把每個數(shù)變成1再求和

2、短路操作(short-circuiting)

1、boolean anyMatch(Predicate<? super T> predicate);// 有一個符合判斷

2、boolean noneMatch(Predicate<? super T> predicate);// 沒有一個符合判斷

3、Optional<T> findFirst();// 有序的,找到第一個

4、Optional<T> findAny();// 不要求有序的,找到一個

5、boolean (Predicate<? super T> predicate);// 全部符合判斷

2、Stream流水線解決方案

1、ReferencePipeLine

Stream中使用Stage的概念來描述一個完整的操作,并用某種實例化后的PipelineHelper來代表Stage,將具有先后順序的各個Stage連到一起,就構(gòu)成了整個流水線。跟Stream相關(guān)類和接口的繼承關(guān)系圖示。

Android培訓,安卓培訓,手機開發(fā)培訓,移動開發(fā)培訓,云培訓培訓

下面是源碼,Head表示Source stage,例如Collection.stream(),這里面沒有對數(shù)據(jù)的操作,StatelessOp和StatefuleOp分別對應無狀態(tài)和有狀態(tài)的中間操作

    /**     * Source stage of a ReferencePipeline.     *     * @param <E_IN> type of elements in the upstream source     * @param <E_OUT> type of elements in produced by this stage     * @since 1.8     */
    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>    
    @Override
        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {            throw new UnsupportedOperationException();
        }
    /**     * Base class for a stateless intermediate stage of a Stream.     *     * @param <E_IN> type of elements in the upstream source     * @param <E_OUT> type of elements in produced by this stage     * @since 1.8     */
    abstract static class StatelessOp<E_IN, E_OUT>            extends ReferencePipeline<E_IN, E_OUT>
    /**     * Base class for a stateful intermediate stage of a Stream.     *     * @param <E_IN> type of elements in the upstream source     * @param <E_OUT> type of elements in produced by this stage     * @since 1.8     */
    abstract static class StatefulOp<E_IN, E_OUT>            extends ReferencePipeline<E_IN, E_OUT>

通過之前就可以看到的ReferencePipeline.Head<>生成第一個Source stage,緊接著調(diào)用一系列的中間操作,不斷產(chǎn)生新的Stream。這些Stream對象以雙向鏈表的形式組織在一起,構(gòu)成整個流水線,由于每個Stage都記錄了前一個Stage和本次的操作以及回調(diào)函數(shù),依靠這種結(jié)構(gòu)就能建立起對數(shù)據(jù)源的所有操作。這就是Stream記錄操作的方式。

Android培訓,安卓培訓,手機開發(fā)培訓,移動開發(fā)培訓,云培訓培訓

2、Sink

上面講到的是Stream流水線如何流水操作計算,但是如何組合就要看Sink這個接口了,該接口包含以下方法

方法名作用
void begin(long size)開始遍歷元素之前調(diào)用該方法,通知Sink做好準備。
void end()所有元素遍歷完成之后調(diào)用,通知Sink沒有更多的元素了。
boolean cancellationRequested()是否可以結(jié)束操作,可以讓短路操作盡早結(jié)束。
void accept(T t)遍歷元素時調(diào)用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調(diào)方法封裝到該方法里,前一個Stage只需要調(diào)用當前Stage.accept(T t)方法就行了。

通過以上方法,各個Stage之間的調(diào)用就實現(xiàn)了,每個Stage將自己的操作封裝到Sink中,然后只需要訪問下一個Stage的accept方法即可。

下面分別舉無狀態(tài)中間操作和有狀態(tài)中間操作對這幾個方法的實現(xiàn)。

首先是map的實現(xiàn)

// Stream.map()
    @Override
    @SuppressWarnings("unchecked")    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireN

轉(zhuǎn)載請標明出處:http://www.cnblogs.com/MoEee/p/6490573.html

http://www.cnblogs.com/MoEee/p/7229073.htm
l