中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

springboot~disruptor異步(bu)隊列

Disruptor

Disruptor是(shi)英國(guo)外匯交(jiao)易公司LMAX開(kai)發的一個高性能(neng)(neng)隊(dui)列,研(yan)發的初衷(zhong)是(shi)解決內(nei)存隊(dui)列的延遲問(wen)題(在性能(neng)(neng)測試中發現竟(jing)然與I/O操作處于同樣的數量級)。

Java內置隊列的問題

介紹Disruptor之前,我們先來看一看常用的線程安全的內置隊列有什么問題。Java的內置隊列如下表所示。

隊列的底層一般(ban)分成三(san)種:數組、鏈表和堆。其(qi)中,堆一般(ban)情況下是為了實現(xian)帶有優(you)先級(ji)特性的隊列,暫且不考慮。

從數(shu)(shu)組和(he)(he)鏈表(biao)兩(liang)種數(shu)(shu)據結構來(lai)看,基于(yu)數(shu)(shu)組線(xian)(xian)程(cheng)(cheng)(cheng)安(an)全(quan)的(de)隊列(lie),比較(jiao)典型的(de)是ArrayBlockingQueue,它主要(yao)通(tong)(tong)過加鎖(suo)的(de)方(fang)式(shi)(shi)來(lai)保證線(xian)(xian)程(cheng)(cheng)(cheng)安(an)全(quan);基于(yu)鏈表(biao)的(de)線(xian)(xian)程(cheng)(cheng)(cheng)安(an)全(quan)隊列(lie)分成LinkedBlockingQueue和(he)(he)ConcurrentLinkedQueue兩(liang)大類,前者(zhe)也通(tong)(tong)過鎖(suo)的(de)方(fang)式(shi)(shi)來(lai)實現線(xian)(xian)程(cheng)(cheng)(cheng)安(an)全(quan),而(er)后者(zhe)以及(ji)上面表(biao)格中的(de)LinkedTransferQueue都(dou)是通(tong)(tong)過原子變(bian)量compare and swap(以下簡稱“CAS”)這(zhe)種不(bu)加鎖(suo)的(de)方(fang)式(shi)(shi)來(lai)實現的(de)。

但是對(dui) volatile類型的變量進行 CAS 操作,存在偽共享問題,下(xia)面介(jie)紹(shao)一下(xia)

偽共享

CPU的(de)(de)緩(huan)存(cun)系統(tong)是以緩(huan)存(cun)行(xing)(cache line)為單位存(cun)儲的(de)(de),一般的(de)(de)大(da)小為64bytes。在(zai)多(duo)(duo)線程(cheng)(cheng)程(cheng)(cheng)序的(de)(de)執行(xing)過程(cheng)(cheng)中,存(cun)在(zai)著一種情況,多(duo)(duo)個(ge)需要(yao)頻繁修(xiu)改的(de)(de)變量存(cun)在(zai)同(tong)一個(ge)緩(huan)存(cun)行(xing)當(dang)中。

假設:有兩個線程分別訪問并修改X和Y這兩個變量,X和Y恰好在同一個緩存行上,這兩個線程分別在不同的CPU上執行。那么每個CPU分別更新好X和Y時將緩存行刷入內存時,發現有別的修改了各自緩存行內的數據,這時緩存行會失效,從L3中重新獲取。這樣的話,程序執行效率明顯下降。為了減少這種情況的發生,其實就是避免X和Y在同一個緩存行中,可以主動添加一些無關變量將緩存行填充滿,比如在X對象中添加一些變量,讓它有64 Byte那么大,正好占滿一個緩存行。

偽共享問題 的解決方案

簡單的說,就是 以空間換時間: 使用占位字節,將變量的所在的 緩沖行 塞滿。
disruptor 無鎖框架就是(shi)這(zhe)么(me)干的。

Disruptor框架是如何解決偽共享問題的?

在Disruptor中有一(yi)個(ge)重要的(de)類(lei)Sequence,該(gai)類(lei)包裝了一(yi)個(ge)volatile修(xiu)飾的(de)long類(lei)型數據value,無論是Disruptor中的(de)基于(yu)數組實現的(de)緩沖區(qu)(qu)RingBuffer,還是生產者,消費(fei)者,都有各(ge)自獨立的(de)Sequence,RingBuffer緩沖區(qu)(qu)中,Sequence標示(shi)著寫入(ru)進度,例如每次生產者要寫入(ru)數據進緩沖區(qu)(qu)時,都要調(diao)用(yong)RingBuffer.next()來獲得(de)下一(yi)個(ge)可使用(yong)的(de)相(xiang)對位置。對于(yu)生產者和消費(fei)者來說,Sequence標示(shi)著它們的(de)事件序號。

例子

/**
 * 停車場問題.
 * 1) 事件對象Event
 * 2)三個消費者Handler
 * 3)一個生產者Processer
 * 4)執行Main方法
 */
public class DisruptorCar {
    private static final Integer NUM = 1; // 1,10,100,1000

    /**
     * 測試入口 ,
     * 一個生產者(汽車進入停車場);
     * 三個消費者(一個記錄汽車信息,一個發送消息給系統,一個發送消息告知司機)
     * 前兩個消費者同步執行,都有結果了再執行第三個消費者
     */
    @Test
     public  void main() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        int bufferSize = 2048; // 2的N次方
        try {
            // 創建線程池,負責處理Disruptor的四個消費者
            ExecutorService executor = Executors.newFixedThreadPool(4);

            // 初始化一個 Disruptor
            Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
                @Override
                public MyInParkingDataEvent newInstance() {
                    return new MyInParkingDataEvent(); // Event 初始化工廠
                }
            }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());

            // 使用disruptor創建消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
            EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
                    new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());

            // 當上面兩個消費者處理結束后在消耗 smsHandler
            MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
            handlerGroup.then(myParkingDataSmsHandler);

            // 啟動Disruptor
            disruptor.start();

            CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程準備好了就可以通知主線程繼續工作了
            // 生產者生成數據
            executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
            countDownLatch.await(); // 等待生產者結束

            disruptor.shutdown();
            executor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("總耗時:" + (System.currentTimeMillis() - beginTime));
    }

    public class MyInParkingDataEvent {

        private String carLicense; // 車牌號

        public String getCarLicense() {
            return carLicense;
        }

        public void setCarLicense(String carLicense) {
            this.carLicense = carLicense;
        }

    }

    /**
     * Handler 第一個消費者,負責保存進場汽車的信息
     */
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
            long threadId = Thread.currentThread().getId(); // 獲取當前線程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
            System.out.println(String.format("Thread Id %s 保存 %s 到數據庫中 ....", threadId, carLicense));
        }

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            this.onEvent(myInParkingDataEvent);
        }

    }

    /**
     * 第二個消費者,負責發送通知告知工作人員(Kafka是一種高吞吐量的分布式發布訂閱消息系統)
     */
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 獲取當前線程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
            System.out.println(String.format("Thread Id %s 發送 %s 進入停車場信息給 kafka系統...", threadId, carLicense));
        }

    }

    /**
     * 第三個消費者,sms短信服務,告知司機你已經進入停車場,計費開始。
     */
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 獲取當前線程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
            System.out.println(String.format("Thread Id %s 給  %s 的車主發送一條短信,并告知他計費開始了 ....", threadId, carLicense));
        }

    }

    /**
     * 生產者,進入停車場的車輛
     */
    public class MyInParkingDataEventPublisher implements Runnable {

        private CountDownLatch countDownLatch; // 用于監聽初始化操作,等初始化執行完畢后,通知主線程繼續工作
        private Disruptor<MyInParkingDataEvent> disruptor;

        public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
                                             Disruptor<MyInParkingDataEvent> disruptor) {
            this.countDownLatch = countDownLatch;
            this.disruptor = disruptor;
        }

        @Override
        public void run() {
            MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
            try {
                for (int i = 0; i < NUM; i++) {
                    disruptor.publishEvent(eventTranslator);
                    Thread.sleep(1000); // 假設一秒鐘進一輛車
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown(); // 執行完畢后通知 await()方法
                System.out.println(NUM + "輛車已經全部進入進入停車場!");
            }
        }

    }

    class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {

        @Override
        public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
            this.generateData(myInParkingDataEvent);
        }

        private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
            myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int) (Math.random() * 100000)); // 隨機生成一個車牌號
            System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
            return myInParkingDataEvent;
        }

    }

}

posted @ 2021-06-05 19:14  張占嶺  閱讀(1977)  評論(0)    收藏  舉報