EF架(jia)構~通過EF6的(de)DbCommand攔截器來實現(xian)數據庫讀寫(xie)分離~終結~配置的(de)優(you)化和事(shi)務(wu)里讀寫(xie)的(de)統一
本講是通過DbCommand攔截器來實現讀寫分離的最后一講,對之前幾篇文章做了一個優化,無論是程序可讀性還是實用性上都有一個提升,在配置信息這塊,去除了字符串方式的拼接,取而代之的是section數組,這樣在修改配置時更加清晰了;而實用性上,徹底改變了讀(du)和寫不(bu)能共用一個倉儲對(dui)象的缺點(dian),并(bing)且在一個(ge)事務里可以讀(du)寫并(bing)存,并(bing)為了數據的一致性,使事務里的curd操作指向主庫(ku),這一點(dian)很(hen)重要!
前幾篇文章的目錄
EF架構~通過EF6的DbCommand攔截器來實現數據庫讀寫分離~再續~添加對各只讀服務器的心跳檢測 (2015-01-09 17:52)
EF架構~通過EF6的DbCommand攔截器來實現數據庫讀寫分離~續~添加事務機制 (2015-01-08 14:08)
EF架構~通過EF6的DbCommand攔截器來實現數據庫讀寫分離 (2015-01-07 17:31)
功能架構圖如下
下面我們來分塊看一下這次的修改
一 配置文件的修改
<configSections>
<section name="DistributedReadWriteSection" type="Project.DistributedReadWriteForEF.DistributedReadWriteSectionHandler, Project.DistributedReadWriteForEF"/>
</configSections>
<DistributedReadWriteSection>
<add key="readDb1" Ip="192.168.2.71" Port="1433" DbName="background_read1" UserId="sa" Password="zzl123" />
<add key="readDb2" Ip="192.168.2.71" Port="1433" DbName="TestWrite_Read_Zzl" UserId="sa" Password="zzl123" />
<add key="readDb3" Ip="192.168.2.29" Port="1433" DbName="TestWrite_Read_Zzl" UserId="sa" Password="1" />
</DistributedReadWriteSection>
<appSettings>
<!-- 只讀服務器的sql連接串配置模版-->
<add key ="readDbConnection" value="data source={0};initial catalog={1};persist security info=True;user id={2};password={3};multipleactiveresultsets=True;application name=EntityFramework"/>
<add key ="writeDbConnection" value="data source=.;initial catalog=background;persist security info=True;user id=sa;password=zzl123;multipleactiveresultsets=True;application name=EntityFramework"/>
</appSettings>
/// <summary> /// redis配置信息加載(zai) /// </summary> internal class DistributedReadWriteManager { /// <summary> /// 配置信(xin)息(xi)實體 /// </summary> public static IList<DistributedReadWriteSection> Instance { get { return GetSection(); } } private static IList<DistributedReadWriteSection> GetSection() { var dic = ConfigurationManager.GetSection("DistributedReadWriteSection") as Dictionary<string, DistributedReadWriteSection>; return dic.Values.ToList(); } }
/// <summary> /// DistributedReadWriteForEFSection塊(kuai),在web.config中提供(gong)DistributedReadWriteForEFSection塊(kuai)定義(yi) /// </summary> internal class DistributedReadWriteSection : ConfigurationSection { /// <summary> /// 主機地(di)址 /// </summary> [ConfigurationProperty("Ip", DefaultValue = "127.0.0.1")] public string Ip { get { return (string)this["Ip"]; } set { this["Ip"] = value; } } /// <summary> /// 端口(kou)號(hao) /// </summary> [ConfigurationProperty("Port", DefaultValue = "1433")] public int Port { get { return (int)this["Port"]; } set { this["Port"] = value; } } /// <summary> /// 數據庫名(ming)稱 /// </summary> [ConfigurationProperty("DbName", DefaultValue = "Test")] public string DbName { get { return (string)this["DbName"]; } set { this["DbName"] = value; } } /// <summary> /// 數據庫(ku)賬號 /// </summary> [ConfigurationProperty("UserId", DefaultValue = "sa")] public string UserId { get { return (string)this["UserId"]; } set { this["UserId"] = value; } } /// <summary> /// 數(shu)據庫(ku)賬(zhang)號 /// </summary> [ConfigurationProperty("Password", DefaultValue = "sa")] public string Password { get { return (string)this["Password"]; } set { this["Password"] = value; } } }
internal class DistributedReadWriteSectionHandler : IConfigurationSectionHandler { #region IConfigurationSectionHandler 成員 public object Create(object parent, object configContext, System.Xml.XmlNode section) { Dictionary<string, DistributedReadWriteSection> names = new Dictionary<string, DistributedReadWriteSection>(); string _key = string.Empty; string _ip = string.Empty; string _dbName = string.Empty; string _userId = string.Empty; string _password = string.Empty; int _port = 1433; foreach (XmlNode childNode in section.ChildNodes) { if (childNode.Attributes["key"] != null) { _key = childNode.Attributes["key"].Value; if (childNode.Attributes["Ip"] != null) { _ip = childNode.Attributes["Ip"].Value; } if (childNode.Attributes["Port"] != null) { _port = Convert.ToInt32(childNode.Attributes["Port"].Value); } if (childNode.Attributes["DbName"] != null) { _dbName = childNode.Attributes["DbName"].Value; } if (childNode.Attributes["UserId"] != null) { _userId = childNode.Attributes["UserId"].Value; } if (childNode.Attributes["Password"] != null) { _password = childNode.Attributes["Password"].Value; } names.Add(_key, new DistributedReadWriteSection { Ip = _ip, Port = _port, DbName = _dbName, UserId = _userId, Password = _password }); } } return names; } #endregion }
二 倉儲大叔事務塊修改
public static void UsingNoMsdtc(IUnitOfWork db, bool isOutest, Action action) { var objectContext = ((System.Data.Entity.Infrastructure.IObjectContextAdapter)db).ObjectContext; try { objectContext.Connection.Close(); //強(qiang)制(zhi)將所有curd操作維持到主庫 Project.DistributedReadWriteForEF.CommandInterceptor.IsTransactionScope = true; //重新設置鏈接串 if (System.Configuration.ConfigurationManager.AppSettings["writeDbConnection"] != null) objectContext.TransactionHandler.DbContext.Database.Connection.ConnectionString = System.Configuration.ConfigurationManager.AppSettings["writeDbConnection"]; objectContext.Connection.Open(); using (TransactionScope trans = new TransactionScope()) { action(); trans.Complete(); Project.DistributedReadWriteForEF.CommandInterceptor.IsTransactionScope = false;//事務結束將走(zou)讀寫分離(li) } } finally { if (isOutest)//如(ru)果是(shi)最外(wai)層(ceng)事(shi)(shi)務,再將連接關閉(bi)!內(nei)部事(shi)(shi)務與(yu)外(wai)部事(shi)(shi)務需要共用一(yi)個Connection的(de)連接 objectContext.Connection.Close(); //只能(neng)關閉,不能(neng)dispose,因(yin)為dispose之(zhi)后,上下(xia)文就(jiu)無法得到鏈接串了 } }
三 DbCommand攔截器的修改
/// <summary> /// SQL命令攔截(jie)器 /// 主要實現EF的(de)讀寫分離 /// </summary> public class CommandInterceptor : DbCommandInterceptor { static CommandInterceptor() { readConnList = DistributedReadWriteManager.Instance; sysTimer.Enabled = true; sysTimer.Elapsed += sysTimer_Elapsed; sysTimer.Start(); } /// <summary> /// 是(shi)否在一個(ge)事務中,如果(guo)是(shi)select,insert,update,delete都走(zou)主庫 /// ThreadStatic標識它只在當前線程(cheng)有(you)效 /// </summary> [ThreadStatic] public static bool IsTransactionScope = false; /// <summary> /// 鎖住它 /// </summary> private static object lockObj = new object(); /// <summary> /// 定期(qi)找(zhao)沒有(you)在線的數據(ju)庫服務器 /// </summary> private static Timer sysTimer = new Timer(5000); /// <summary> /// 讀庫(ku),從庫(ku)集群,寫庫(ku)不用設置走(zou)默認的EF框架 /// </summary> private static IList<DistributedReadWriteSection> readConnList; #region Private Methods private static void sysTimer_Elapsed(object sender, ElapsedEventArgs e) { if (readConnList != null && readConnList.Any()) { foreach (var item in readConnList) { //心跳測試,將死掉的服(fu)務器IP從(cong)列(lie)表(biao)中移除 var client = new TcpClient(); try { client.Connect(new IPEndPoint(IPAddress.Parse(item.Ip), item.Port)); } catch (SocketException) { //異常,沒有(you)連接(jie)上 readConnList.Remove(item); } if (!client.Connected) { readConnList.Remove(item); } } } } /// <summary> /// 處理讀庫字符串 /// </summary> /// <returns></returns> private string GetReadConn() { if (readConnList != null && readConnList.Any()) { var resultConn = readConnList[Convert.ToInt32(Math.Floor((double)new Random().Next(0, readConnList.Count)))]; return string.Format(System.Configuration.ConfigurationManager.AppSettings["readDbConnection"] , resultConn.Ip , resultConn.DbName , resultConn.UserId , resultConn.Password); } return string.Empty; } /// <summary> /// 只讀(du)庫的選(xuan)擇,加工command對(dui)象(xiang) /// 說明:事(shi)務中,所有語(yu)句都(dou)走主庫(ku),事(shi)務外(wai)select走讀庫(ku),insert,update,delete走主庫(ku) /// 希望:一個WEB請求中,讀與寫的倉儲使(shi)用一個,不需要(yao)在程序中去重新(xin)定義 /// </summary> /// <param name="command"></param> private void ReadDbSelect(DbCommand command) { if (!string.IsNullOrWhiteSpace(GetReadConn()))//如果(guo)配置了(le)讀(du)寫分離,就去實現 { command.Connection.Close(); if (!command.CommandText.StartsWith("insert", StringComparison.InvariantCultureIgnoreCase) && !IsTransactionScope) command.Connection.ConnectionString = GetReadConn(); command.Connection.Open(); } } #endregion #region Override Methods /// <summary> /// Linq to Entity生成的update,delete /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void NonQueryExecuting(DbCommand command, DbCommandInterceptionContext<int> interceptionContext) { base.NonQueryExecuting(command, interceptionContext);//update,delete等寫操作直(zhi)接(jie)走主庫 } /// <summary> /// 執行sql語句,并返(fan)回第一行第一列,沒有找到返(fan)回null,如果(guo)數據庫中值為null,則返(fan)回 DBNull.Value /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void ScalarExecuting(DbCommand command, DbCommandInterceptionContext<object> interceptionContext) { ReadDbSelect(command); base.ScalarExecuting(command, interceptionContext); } /// <summary> /// Linq to Entity生成的select,insert /// 發送(song)到sqlserver之前觸發 /// warning:在select語句中DbCommand.Transaction為null,而ef會為每(mei)個insert添加一個DbCommand.Transaction進(jin)行(xing)包裹 /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void ReaderExecuting(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext) { ReadDbSelect(command); base.ReaderExecuted(command, interceptionContext); } /// <summary> /// 發送到sqlserver之后觸發 /// </summary> /// <param name="command"></param> /// <param name="interceptionContext"></param> public override void ReaderExecuted(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext) { base.ReaderExecuted(command, interceptionContext); } #endregion }
好了,到(dao)這里,通過(guo)攔截器來(lai)實(shi)現(xian)數(shu)據庫讀(du)寫分離的(de)方案就徹底完成了,這個(ge)版本應該(gai)算是個(ge)終級了吧,呵呵!感謝您(nin)的(de)閱讀(du)!