EF架構~通過(guo)EF6的(de)(de)DbCommand攔截器來實現數據(ju)庫(ku)讀(du)寫分離~再續~添加對各(ge)只(zhi)讀(du)服(fu)務(wu)器的(de)(de)心(xin)跳檢測
上一講中基本實現了對數據庫的讀寫分離,而(er)在(zai)選擇(ze)只讀數(shu)據(ju)庫上只是隨機選擇(ze),并沒有去檢(jian)測(ce)數(shu)據(ju)庫服(fu)務(wu)(wu)(wu)器是否有效,如服(fu)務(wu)(wu)(wu)器掛了(le),SQL服(fu)務(wu)(wu)(wu)停了(le),端口被封了(le)等(deng)(deng)等(deng)(deng),而(er)本講主(zhu)要對以上功能進行一(yi)個(ge)(ge)實(shi)現,并對配置文件也進行了(le)一(yi)些優化(hua),讓它更好(hao)的(de)支(zhi)持多個(ge)(ge)數(shu)據(ju)庫服(fu)務(wu)(wu)(wu)器,分(fen)別配置各(ge)個(ge)(ge)的(de)賬(zhang)號(hao)和密碼(ma)及數(shu)據(ju)庫服(fu)務(wu)(wu)(wu)端口等(deng)(deng)等(deng)(deng),接下來,就來看(kan)一(yi)下主(zhu)要的(de)代碼(ma)吧。
一 配置文件
<!-- ef實現對sql讀寫分離的配置,sqlserver端采用發布與訂閱實現 -->
<add key="readDb" value="
192.168.2.71|1433|background_read1|sa|zzl123,
192.168.2.71|1433|TestWrite_Read_Zzl|sa|zzl123,
192.168.2.29|1433|TestWrite_Read_Zzl|sa|1"
/>
<!-- 只讀服務器的sql連接串配置模版-->
<add key ="readDbConnectioin" value="data source={0};initial catalog={1};persist security info=True;user id={2};password={3};multipleactiveresultsets=True;application name=EntityFramework"/>
二(er) 數據庫配置實體類
/// <summary> /// 只讀數據庫配(pei)置(zhi)實體(ti) /// </summary> public class ReadDbConfig { public ReadDbConfig() { Port = 1433; UserId = "sa"; } public string Ip { get; set; } public int Port { get; set; } public string DbName { get; set; } public string UserId { get; set; } public string Password { get; set; } }
三 對SQL攔(lan)截器(qi)進行優化,添(tian)加(jia)了TCP的(de)心跳檢測
lock (lockObj) { if (readConnList != null && readConnList.Any()) { foreach (var item in readConnList) { //心跳(tiao)測試,將死(si)掉的服務器IP從列表中移除 var client = new TcpClient(); try { client.Connect(new IPEndPoint(IPAddress.Parse(item.Ip), item.Port)); } catch (SocketException) { //異常,沒有(you)連接上 readConnList.Remove(item); } if (!client.Connected) { readConnList.Remove(item); } } } }
四 對于數據庫(ku)庫(ku)端還是(shi)這(zhe)(zhe)前通過發布和(he)訂閱實現的,需要(yao)注意的是(shi),這(zhe)(zhe)些功(gong)能需要(yao)使(shi)用“機(ji)器名(ming)”進行鏈接,使(shi)用ip和(he)域(yu)名(ming)都是(shi)無效的
五 下面貢獻一下完成的攔截器代碼
/// <summary> /// SQL命令攔(lan)截器 /// 主要實現EF的讀寫分離 /// </summary> public class SqlCommandInterceptor : DbCommandInterceptor { static SqlCommandInterceptor() { InitConfig(); initSysTimer.Enabled = true; initSysTimer.Elapsed += initSysTimer_Elapsed; initSysTimer.Start(); sysTimer.Enabled = true; sysTimer.Elapsed += sysTimer_Elapsed; sysTimer.Start(); } private static object lockObj = new object(); /// <summary> /// 定期找沒有在(zai)線的數據(ju)庫服務器 /// </summary> private static Timer sysTimer = new Timer(6); /// <summary> /// 系統配置文(wen)件輪訓讀時間(jian)間(jian)隔 /// </summary> private static Timer initSysTimer = new Timer(60000 * 10); /// <summary> /// 讀(du)庫,從庫集群,寫(xie)庫不用設置走默認的EF框架 /// </summary> private static List<ReadDbConfig> readConnList; /// <summary> /// 配置(zhi)初始化 /// </summary> private static void InitConfig() { lock (lockObj) { var temp = new List<ReadDbConfig>(); var str = System.Configuration.ConfigurationManager.AppSettings["readDb"] ?? string.Empty; var readList = str.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries); if (readList != null && readList.Any()) { foreach (var item in readList) { var configArr = item.Split(new char[] { '|' }, StringSplitOptions.RemoveEmptyEntries); temp.Add(new ReadDbConfig { Ip = configArr[0], Port = int.Parse(configArr[1]), DbName = configArr[2], UserId = configArr[3], Password = configArr[4], }); } } readConnList = temp; } } #region Private Methods private static void initSysTimer_Elapsed(object sender, ElapsedEventArgs e) { InitConfig(); } /// <summary> /// 輪詢服(fu)務(wu) /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void sysTimer_Elapsed(object sender, ElapsedEventArgs e) { lock (lockObj) { if (readConnList != null && readConnList.Any()) { foreach (var item in readConnList) { //心跳測試(shi),將(jiang)死掉的服務(wu)器IP從列(lie)表中(zhong)移除 var client = new TcpClient(); try { client.Connect(new IPEndPoint(IPAddress.Parse(item.Ip), item.Port)); } catch (SocketException) { //異(yi)常(chang),沒有連接上 readConnList.Remove(item); } if (!client.Connected) { readConnList.Remove(item); } } } } } /// <summary> /// 處(chu)理(li)讀庫(ku)字符串 /// </summary> /// <returns></returns> private string GetReadConn() { if (readConnList != null && readConnList.Any()) { var resultConn = readConnList[Convert.ToInt32(Math.Floor((double)new Random().Next(0, readConnList.Count)))]; return string.Format(System.Configuration.ConfigurationManager.AppSettings["readDbConnectioin"] , resultConn.Ip , resultConn.DbName , resultConn.UserId , resultConn.Password); } return string.Empty; } /// <summary> /// 只讀(du)庫的(de)選(xuan)擇,加工command對象 /// </summary> /// <param name="command"></param> private void ReadDbSelect(DbCommand command) { if (!string.IsNullOrWhiteSpace(GetReadConn()))//如果配置了讀寫(xie)分離,就去(qu)實現 { if (!command.CommandText.StartsWith("insert", StringComparison.InvariantCultureIgnoreCase)) { command.Connection.Close(); command.Connection.ConnectionString = GetReadConn(); command.Connection.Open(); } } } #endregion #region Override Methods /// <summary> /// Linq to Entity生成的(de)update,delete /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void NonQueryExecuting(DbCommand command, DbCommandInterceptionContext<int> interceptionContext) { base.NonQueryExecuting(command, interceptionContext);//update,delete等寫操作直接走主庫 } /// <summary> /// 執行sql語(yu)句,并返(fan)(fan)回第一(yi)行第一(yi)列,沒有找到返(fan)(fan)回null,如果數(shu)據(ju)庫中值(zhi)為null,則返(fan)(fan)回 DBNull.Value /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void ScalarExecuting(DbCommand command, DbCommandInterceptionContext<object> interceptionContext) { ReadDbSelect(command); base.ScalarExecuting(command, interceptionContext); } /// <summary> /// Linq to Entity生成的(de)select,insert /// 發(fa)送到sqlserver之(zhi)前觸發(fa) /// warning:在select語句(ju)中DbCommand.Transaction為null,而ef會為每個insert添加一個DbCommand.Transaction進行包裹(guo) /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void ReaderExecuting(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext) { ReadDbSelect(command); base.ReaderExecuted(command, interceptionContext); } /// <summary> /// 發送到sqlserver之后觸發 /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void ReaderExecuted(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext) { base.ReaderExecuted(command, interceptionContext); } #endregion }