国产毛多水多高潮高清,久热这里只有精品视频6,国内精品久久久久久久久电影网,国产男同志CHINA69,精品999日本久久久影院,人人妻人人澡人人爽人人精品,亚洲中文无码永久免

造轮子之MemorySafeLinkedBlockingQueue-开运棋牌赞助摩洛哥

造轮子之MemorySafeLinkedBlockingQueue

2026-01-17 08:47:36投稿人:開云網(wǎng)賭安全的首頁(蕪湖)有限公司圍觀4763 評(píng)論

造輪子之MemorySafeLinkedBlockingQueue-LinkBlockingQueue改進(jìn)

LinkBlockingQueue改進(jìn)

問題背景

https://github.com/apache/dubbo/pull/9722/files
使用線程池的同學(xué)對(duì)于標(biāo)題中的隊(duì)列想必都有過使用,但上述隊(duì)列使用不當(dāng)時(shí)則會(huì)造成程序OOM ,那怎么來控制呢?

使用ArrayBlockingQueue ?如何來評(píng)估長度 ?

是否有一個(gè)完美的解決方案呢 ,MemorySafeLinkedBlockingQueue則通過對(duì)內(nèi)存的限制判斷盡面控制隊(duì)列的容量 ,完成解決了可能存在的OOM問題 。

獲取內(nèi)存大?。ㄗ?:單位大B;支持準(zhǔn)實(shí)時(shí)更新) :

Runtime.getRuntime().freeMemory()//JVM中已經(jīng)申請(qǐng)到的堆內(nèi)存中還未使用的大小Runtime.getRuntime().maxMemory()// JVM可從操作系統(tǒng)申請(qǐng)到的最大內(nèi)存值 -XxmRuntime.getRuntime().totalMemory()// JVM已從操作系統(tǒng)申請(qǐng)到的內(nèi)存大小 —Xxs可設(shè)置該值大小-初始堆的大小

線程池在excute任務(wù)時(shí),放隊(duì)列,放不進(jìn)去 ,使用新線程運(yùn)行任務(wù)。這個(gè)放不進(jìn)行 ,是使用的offer??非阻塞方法嗎 ?

參考:https://blog.csdn.net/weixin_43108539/article/details/125190023

public void execute(Runnable command) {         if (command == null)            throw new NullPointerException();     	//拿到32位的int        int c = ctl.get();     	//工作線程數(shù)<核心線程數(shù)        if (workerCountOf(c) < corePoolSize) {             //進(jìn)入if,代表可以創(chuàng)建 核心 線程數(shù)            if (addWorker(command, true))                return;            //如果沒進(jìn)入if,代表創(chuàng)建核心線程數(shù)失敗,重新獲取 ctl            c = ctl.get();        }        //判斷線程池為Running狀態(tài),將任務(wù)添加入阻塞隊(duì)列,使用offer        if (isRunning(c) && workQueue.offer(command)) {             int recheck = ctl.get();            //再次判斷是否為Running狀態(tài),若不是Running狀態(tài),remove任務(wù)            if (! isRunning(recheck) && remove(command))                reject(command);            //如果線程池在Running狀態(tài),線程池?cái)?shù)量為0            else if (workerCountOf(recheck) == 0)                //阻塞隊(duì)列有任務(wù),但是沒有工作線程,添加一個(gè)任務(wù)為空的工作線程處理阻塞隊(duì)列中的任務(wù)                addWorker(null, false);        }        //阻塞隊(duì)列已滿,創(chuàng)建非核心線程,拒絕策略-addWorker中有判斷核心線程數(shù)是否超過最大線程數(shù)        else if (!addWorker(command, false))            reject(command);    }

空閑內(nèi)存計(jì)算

package com.zte.sdn.oscp.queue;import cn.hutool.core.thread.NamedThreadFactory;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;public class MemoryLimitCalculator {     private static volatile long maxAvailable;    private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);    private static void refresh() {         maxAvailable = Runtime.getRuntime().freeMemory();    }    private static void checkAndScheduleRefresh() {         if (!refreshStarted.get()) {             // immediately refresh when first call to prevent maxAvailable from being 0            // to ensure that being refreshed before refreshStarted being set as true            // notice: refresh may be called for more than once because there is no lock            refresh();            if (refreshStarted.compareAndSet(false, true)) {                 ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator"));                // check every 50 ms to improve performance                scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);                Runtime.getRuntime().addShutdownHook(new Thread(() ->{                     refreshStarted.set(false);                    scheduledExecutorService.shutdown();                }));            }        }    }    /**     * Get the maximum available memory of the current JVM.     *     * @return maximum available memory     */    public static long maxAvailable() {         checkAndScheduleRefresh();        return maxAvailable;    }    /**     * Take the current JVM's maximum available memory     * as a percentage of the result as the limit.     *     * @param percentage percentage     * @return available memory     */    public static long calculate(final float percentage) {         if (percentage <= 0 || percentage >1) {             throw new IllegalArgumentException();        }        checkAndScheduleRefresh();        return (long) (maxAvailable() * percentage);    }    /**     * By default, it takes 80% of the maximum available memory of the current JVM.     *     * @return available memory     */    public static long defaultLimit() {         checkAndScheduleRefresh();        return (long) (maxAvailable() * 0.8);    }}

內(nèi)存安全隊(duì)列

package com.zte.sdn.oscp.queue;import java.util.Collection;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class MemorySafeLinkedBlockingQueueextends LinkedBlockingQueue{     private static final long serialVersionUID = 8032578371739960142L;    public static int THE_256_MB = 256 * 1024 * 1024;    private int maxFreeMemory;    private Rejectorrejector;    public MemorySafeLinkedBlockingQueue() {         this(THE_256_MB);    }    public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {         super(Integer.MAX_VALUE);        this.maxFreeMemory = maxFreeMemory;        //default as DiscardPolicy to ensure compatibility with the old version        this.rejector = new DiscardPolicy<>();    }    public MemorySafeLinkedBlockingQueue(final Collection<? extends E>c,                                         final int maxFreeMemory) {         super(c);        this.maxFreeMemory = maxFreeMemory;        //default as DiscardPolicy to ensure compatibility with the old version        this.rejector = new DiscardPolicy<>();    }    /**     * set the max free memory.     *     * @param maxFreeMemory the max free memory     */    public void setMaxFreeMemory(final int maxFreeMemory) {         this.maxFreeMemory = maxFreeMemory;    }    /**     * get the max free memory.     *     * @return the max free memory limit     */    public int getMaxFreeMemory() {         return maxFreeMemory;    }    /**     * set the rejector.     *     * @param rejector the rejector     */    public void setRejector(final Rejectorrejector) {         this.rejector = rejector;    }    /**     * determine if there is any remaining free memory.     *     * @return true if has free memory     */    public boolean hasRemainedMemory() {         return MemoryLimitCalculator.maxAvailable() >maxFreeMemory;    }    @Override    public void put(final E e) throws InterruptedException {         if (hasRemainedMemory()) {             super.put(e);        } else {             rejector.reject(e, this);        }    }    @Override    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {         if (!hasRemainedMemory()) {             rejector.reject(e, this);            return false;        }        return super.offer(e, timeout, unit);    }    @Override    public boolean offer(final E e) {         if (!hasRemainedMemory()) {             rejector.reject(e, this);            return false;        }        return super.offer(e);    }}

拒絕策略

注意其中的rejector是拒絕策略,默認(rèn)的DiscardPolicy什么也不處理;

而DiscardOldPolicy的處理邏輯很簡單

public class DiscardOldestPolicyimplements Rejector{     @Override    public void reject(final E e, final Queuequeue) {         queue.poll();        queue.offer(e);    }}

AbortPolicy則直接拋出異常

public class AbortPolicyimplements Rejector{     @Override    public void reject(final E e, final Queuequeue) {         throw new RejectException("no more memory can be used !");    }}

個(gè)人建議增加日志打印即可 。

展開閱讀全文

投稿時(shí)間 :2022-09-09  最后更新 :2022-09-09