Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

10.11. Flow

10.11.1. 自定义 Publisher / Subscriber

			
package cn.netkiller.test;

import java.util.concurrent.Flow;

public class Test {
    public static void main(String[] args) {

        IntPublisher intPublisher = new IntPublisher();
        IntSubscriber intSubscriber = new IntSubscriber();
        intPublisher.subscribe(intSubscriber);
    }

    public static class IntPublisher implements Flow.Publisher<Integer> {

        @Override
        public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            for (int i = 1; i <= 5; i++) {
                System.out.println("Publishing = " + i);
                // 将数据发给订阅者
                subscriber.onNext(i);
            }
            // 发出完成信号
            subscriber.onComplete();
        }
    }

    public static class IntSubscriber implements Flow.Subscriber<Integer> {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onNext(Integer item) {
            System.out.println("onNext: " + item);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError:" + throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    }
}			
			
			

10.11.2. SubmissionPublisher

		
package cn.netkiller.test;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class SubmissionPublisherTest {
    public static void main(String[] args) throws InterruptedException {
        System.out.println(Thread.currentThread().getName());

        // 1. 定义 String 类型的数据发布者,SubmissionPublisher 实现了 Publisher
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        // 2. 创建一个订阅者,用于接收发布者的消息
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("onSubscribe");
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("onNext: " + item);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Throwable: " + throwable.getMessage());
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        // 3. 发布者和订阅者需要建立关系
        publisher.subscribe(subscriber);
        publisher.submit("https://www.netkiller.cn");

        // 4. 发布者开始发布数据
        IntStream.range(0, 11).mapToObj(i -> "publisher -----> message " + i).peek(message -> publisher.submit(message)).forEach(System.out::println);

        // 5. 发布结束后,关闭发布者
        publisher.close();

        // main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
        Thread.currentThread().join(1000);
    }

}
		
			
			
main
onSubscribe
onNext: https://www.netkiller.cn
publisher -----> message 0
publisher -----> message 1
publisher -----> message 2
publisher -----> message 3
publisher -----> message 4
onNext: publisher -----> message 0
onNext: publisher -----> message 1
onNext: publisher -----> message 2
onNext: publisher -----> message 3
onNext: publisher -----> message 4
onNext: publisher -----> message 5
publisher -----> message 5
publisher -----> message 6
publisher -----> message 7
publisher -----> message 8
publisher -----> message 9
publisher -----> message 10
onNext: publisher -----> message 6
onNext: publisher -----> message 7
onNext: publisher -----> message 8
onNext: publisher -----> message 9
onNext: publisher -----> message 10
onComplete			
			
			

10.11.3. Flow.Processor

			
package cn.netkiller.test;


import lombok.SneakyThrows;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class Test {
    @SneakyThrows
    public static void main(String[] args) {
        Test test = new Test();
        try (final var publisher = new SubmissionPublisher<String>()) {

            final var processor = new UpperCaseProcessor();
            publisher.subscribe(processor);

            processor.subscribe(new Flow.Subscriber<String>() {
                                    private Flow.Subscription subscription;

                                    @Override
                                    public void onSubscribe(Flow.Subscription subscription) {
                                        this.subscription = subscription;
                                        System.out.println("Subscriber.onSubscribe");
                                        subscription.request(1);
                                    }

                                    @Override
                                    public void onNext(String item) {
                                        System.out.println("Subscriber.onNext: " + item);
                                        this.subscription.request(1);
                                    }

                                    @Override
                                    public void onError(Throwable throwable) {
                                        System.out.println("Subscriber.onError: " + throwable);
                                    }

                                    @Override
                                    public void onComplete() {
                                        System.out.println("Subscriber.onComplete");
                                    }
                                }
            );

            publisher.submit("abc");
            publisher.submit("xyz");
            publisher.submit("neo");
            publisher.submit("netkiller");
        }
        Thread.currentThread().join(1000);
    }


    static class UpperCaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Processor.onSubscribe");
            subscription.request(1);
        }

        @Override
        public void onNext(String item) {
            System.out.println("Processor.onNext: " + item);
            submit(item.toUpperCase());
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("Processor.onError: " + throwable.getMessage());
            closeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            System.out.println("Processor.onComplete");
            close();
        }
    }
}
			
			
			
			
Processor.onSubscribe
Subscriber.onSubscribe
Processor.onNext: abc
Processor.onNext: xyz
Processor.onNext: neo
Processor.onNext: netkiller
Processor.onComplete
Subscriber.onNext: ABC
Subscriber.onNext: XYZ
Subscriber.onNext: NEO
Subscriber.onNext: NETKILLER
Subscriber.onComplete