中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

java~并行計(ji)算(suan)~大(da)集合的并行處理

上一次寫了關于《FunctionalInterface~一個批量處理數據的類》和《Future和Callable實現大任務的并行處理》的(de)文章,本講主(zhu)要結合(he)(he)實際應(ying)用,來封裝(zhuang)一(yi)個集合(he)(he)并行處理(li)組件,我們的(de)集合(he)(he)分為數(shu)據庫查詢出現的(de)分頁集合(he)(he);還有(you)一(yi)個是(shi)內存的(de)集合(he)(he),今(jin)天主(zhu)要說(shuo)一(yi)下內存集合(he)(he)的(de)并行處理(li)。

場景介紹

  • 有一個比較耗時的工作,將top 400的用戶的行為信息統計
  • 統計的信息來自很多業務,很多服務,不能使用聚合直接計算
  • 這些業務統計的時間,大概每個人平均需要1秒
  • 這些用戶的各種類型,彼此獨立,沒有關系

如何設計

如果直接順序寫(xie)代(dai)碼,那(nei)1萬(wan)的用(yong)戶,需要400秒的時間(jian),這是我們(men)(men)不能接受的,我們(men)(men)使用(yong)并行(xing)編程(cheng)8秒就把(ba)它搞(gao)定。

如何實現

  • 400的集合,進行拆分,每100個為一組,分為4組(4頁)
  • 對每100個集合進行拆分,每2個為1組,將100個分成了50組
  • 對50組數據,開50個線程并行處理,結果為2行完成
  • 400的信息,分成了4頁,每頁2秒,一共8秒

代碼實現

/**
 * 數據集并行處理工具
 */
public class DataHelper {
    /**
     * 并行處理線程數字
     */
    static final int THREAD_COUNT = 50;
    /**
     * 單線程中處理的集合的長度,50個線程,每個線程處理2條,如果處理時間為1S,則需要2S的時間.
     */
    static final int INNER_LIST_LENGTH = 2;
    static Logger logger = LoggerFactory.getLogger(DataHelper.class);

    /**
     * 大集合拆分.
     *
     * @param list
     * @param len
     * @param <T>
     * @return
     */
    private static <T> List<List<T>> splitList(List<T> list, int len) {
        if (list == null || list.size() == 0 || len < 1) {
            return null;
        }
        List<List<T>> result = new ArrayList<List<T>>();
        int size = list.size();
        int count = (size + len - 1) / len;
        for (int i = 0; i < count; i++) {
            List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
            result.add(subList);
        }
        return result;
    }

    /**
     * 并行處理.
     *
     * @param list     大集合
     * @param pageSize 單頁數據大小
     * @param consumer 處理程序
     * @param <T>
     */
    public static <T> void fillDataByPage(List<T> list,
                                          int pageSize,
                                          Consumer<T> consumer) {

        List<List<T>> innerList = new ArrayList<>();
        splitList(list, pageSize).forEach(o -> innerList.add(o));
        int totalPage = innerList.size();
        AtomicInteger i = new AtomicInteger();
        innerList.forEach(items -> {
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
            i.getAndIncrement();
            Collection<BufferInsert<T>> bufferInserts = new ArrayList<>();
            splitList(items, INNER_LIST_LENGTH).forEach(o -> {
                bufferInserts.add(new BufferInsert(o, consumer));
            });

            try {
                executor.invokeAll(bufferInserts);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executor.shutdown();
            logger.info("【當前數據頁:{}/{}】", i.get(), totalPage);
        });

    }

    /**
     * 多線程并發處理數據.
     *
     * @param <T>
     */
    static class BufferInsert<T> implements Callable<Integer> {
        /**
         * 要處理的數據列表.
         */
        List<T> items;
        /**
         * 處理程序.
         */
        Consumer<T> consumer;

        public BufferInsert(List<T> items, Consumer<T> consumer) {
            this.items = items;
            this.consumer = consumer;
        }

        @Override
        public Integer call() {
            for (T item : items) {
                this.consumer.accept(item);
            }
            return 1;
        }
    }

}

調用代碼

    /**
     * 8秒處理400個任務,每個任務執行時間為1S,并行的威力
     */
    @Test
    public void test() {
        List<Integer> sumList = new ArrayList<>();
        for (int i = 0; i < 400; i++) {
            sumList.add(i);
        }
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        DataHelper.fillDataByPage(sumList, 100, (o) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        stopWatch.stop();
        System.out.println("time:" + stopWatch.getTotalTimeMillis());
    }

結果截圖

posted @ 2021-09-10 14:22  張占嶺  閱讀(1163)  評論(0)    收藏  舉報