中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

springboot~SSE做消息推送

一、SSE是什么?

SSE技術是基于單(dan)工通信模式,只是單(dan)純的客戶端(duan)(duan)(duan)向(xiang)服(fu)務端(duan)(duan)(duan)發(fa)送請(qing)(qing)求,服(fu)務端(duan)(duan)(duan)不(bu)會主動發(fa)送給(gei)(gei)客戶端(duan)(duan)(duan)。服(fu)務端(duan)(duan)(duan)采(cai)取的策略(lve)是抓住這個請(qing)(qing)求不(bu)放,等(deng)數據更新的時候(hou)才返回給(gei)(gei)客戶端(duan)(duan)(duan),當客戶端(duan)(duan)(duan)接收到(dao)消息后,再向(xiang)服(fu)務端(duan)(duan)(duan)發(fa)送請(qing)(qing)求,周而復(fu)始。

  • 注意:因為EventSource對象是SSE的客戶端,可能會有瀏覽器對其不支持,但谷歌、火狐、360是可以的,IE不可以。
  • 優點:SSE和WebSocket相比,最大的優勢是便利,服務端不需要其他的類庫,開發難度較低,SSE和輪詢相比它不用處理很多請求,不用每次建立新連接,延遲較低。
  • 缺點:如果客戶端有很多,那就要保持很多長連接,這會占用服務器大量內存和連接數
  • sse 規范:在 html5 的定義中,服務端 sse,一般需要遵循以下要求:
  Content-Type: text/event-stream;
  charset=UTF-8Cache-Control: no-cache
  Connection: keep-alive

實現一個例子

后端代碼

       /**
	* 用于創建連接
	*/
	@GetMapping("/connect/{userId}")
	public SseEmitter connect(@PathVariable String userId) {
		return SseEmitterUtil.connect(userId);
	}

	/**
	 * 推送給所有人
	 * @param message
	 * @return
	 */
	@GetMapping("/push/{message}")
	public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
		// 獲取連接人數
		int userCount = SseEmitterUtil.getUserCount();
		// 如果無在線人數,返回
		if (userCount < 1) {
			return ResponseEntity.status(500).body("無人在線!");
		}
		SseEmitterUtil.batchSendMessage(message);
		return ResponseEntity.ok("發送成功!");
	}

SseEmitterUtil代碼

public class SseEmitterUtil {

	/**
	 * 當前連接數
	 */
	private static AtomicInteger count = new AtomicInteger(0);

	/**
	 * 使用map對象,便于根據userId來獲取對應的SseEmitter,或者放redis里面
	 */
	private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

	/**
	 * 創建用戶連接并返回 SseEmitter
	 * @param userId 用戶ID
	 * @return SseEmitter
	 */
	public static SseEmitter connect(String userId) {
		// 設置超時時間,0表示不過期。默認30秒,超過時間未完成會拋出異常:AsyncRequestTimeoutException
		SseEmitter sseEmitter = new SseEmitter(0L);
		// 注冊回調
		sseEmitter.onCompletion(completionCallBack(userId));
		sseEmitter.onError(errorCallBack(userId));
		sseEmitter.onTimeout(timeoutCallBack(userId));
		sseEmitterMap.put(userId, sseEmitter);
		// 數量+1
		count.getAndIncrement();
		log.info("創建新的sse連接,當前用戶:{}", userId);
		return sseEmitter;
	}

	/**
	 * 給指定用戶發送信息
	 */
	public static void sendMessage(String userId, String message) {
		if (sseEmitterMap.containsKey(userId)) {
			try {
				// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
				sseEmitterMap.get(userId).send(message);
			}
			catch (IOException e) {
				log.error("用戶[{}]推送異常:{}", userId, e.getMessage());
				removeUser(userId);
			}
		}
	}

	/**
	 * 群發消息
	 */
	public static void batchSendMessage(String wsInfo, List<String> ids) {
		ids.forEach(userId -> sendMessage(wsInfo, userId));
	}

	/**
	 * 群發所有人
	 */
	public static void batchSendMessage(String wsInfo) {
		sseEmitterMap.forEach((k, v) -> {
			try {
				v.send(wsInfo, MediaType.APPLICATION_JSON);
			}
			catch (IOException e) {
				log.error("用戶[{}]推送異常:{}", k, e.getMessage());
				removeUser(k);
			}
		});
	}

	/**
	 * 移除用戶連接
	 */
	public static void removeUser(String userId) {
		sseEmitterMap.remove(userId);
		// 數量-1
		count.getAndDecrement();
		log.info("移除用戶:{}", userId);
	}

	/**
	 * 獲取當前連接信息
	 */
	public static List<String> getIds() {
		return new ArrayList<>(sseEmitterMap.keySet());
	}

	/**
	 * 獲取當前連接數量
	 */
	public static int getUserCount() {
		return count.intValue();
	}

	private static Runnable completionCallBack(String userId) {
		return () -> {
			log.info("結束連接:{}", userId);
			removeUser(userId);
		};
	}

	private static Runnable timeoutCallBack(String userId) {
		return () -> {
			log.info("連接超時:{}", userId);
			removeUser(userId);
		};
	}

	private static Consumer<Throwable> errorCallBack(String userId) {
		return throwable -> {
			log.info("連接異常:{}", userId);
			removeUser(userId);
		};
	}

}

前端代碼

<script>
    let source = null;

    // 用時間戳模擬登錄用戶
    const userId = new Date().getTime();

    if (window.EventSource) {

        // 建立連接
        source = new EventSource('/sse/connect/' + userId);

        /**
         * 連接一旦建立,就會觸發open事件
         * 另一種寫法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立連接。。。");
        }, false);

        /**
         * 客戶端收到服務器發來的數據
         * 另一種寫法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });


        /**
         * 如果發生通信錯誤(比如連接中斷),就會觸發error事件
         * 或者:
         * 另一種寫法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("連接關閉");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的瀏覽器不支持SSE");
    }

    // 監聽窗口關閉事件,主動去關閉sse連接,如果服務端設置永不過期,瀏覽器關閉后手動清理服務端數據
    window.onbeforeunload = function () {
        closeSse();
    };

    // 關閉Sse連接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', '/sse/close/' + userId, true);
        httpRequest.send();
        console.log("close");
    }

    // 將消息顯示在網頁上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
posted @ 2023-05-04 17:16  張占嶺  閱讀(4253)  評論(0)    收藏  舉報