kmSqlReplicate/ClassObj/ClsReplicateBase.cs

308 lines
12 KiB
C#

using Dapper;
using Microsoft.Data.SqlClient;
using Microsoft.SqlServer.Management.Smo;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
namespace kmSqlReplicate.ClassObj;
internal class ClsReplicateBase
{
internal void Go()
{
var rn = new Random();
var dtNext = DateTimeOffset.Now.AddSeconds(Debugger.IsAttached ? 5 : 45);
while (!Program.exitNow)
{
if (dtNext < DateTimeOffset.Now)
{
ReplicateNow();
dtNext = DateTimeOffset.Now.AddSeconds(15);
}
Thread.Sleep(rn.Next(750, 1500));
}
}
private void ReplicateNow()
{
try
{
var tbMain2 = InitWorkTable();
using (var cnRep = new SqlConnection(kmCommonLibsCore.Constants.cnSysReplication))
{
var lst = cnRep.Query("dbo.[GetReplicationJobs]", commandType: CommandType.StoredProcedure).ToList();
cnRep.Open();
foreach (var itm1 in lst)
{
string cnString = string.Format("Server={0};Database={1};Integrated Security=SSPI;TrustServerCertificate=True;",
itm1.FromServer.ToString(), itm1.FromDatabase.ToString());
string cmText = "Select *, Schema_Name(Schema_ID) + '.' + [name] [TableAndSchemaName], " +
"@@SERVERNAME [ServerName], DB_Name() [DatabaseName], " +
"Schema_Name(Schema_ID) [SchemaName], [name] [TableName] " +
"From sys.Tables Where [type_desc]='USER_TABLE'";
using (var cn2 = new SqlConnection(cnString))
{
var lst2 = cn2.Query(cmText).ToList();
foreach (var itm2 in lst2)
{
if (Regex.IsMatch(itm2.TableAndSchemaName, itm1.FromTable, RegexOptions.IgnoreCase))
{
var drx = tbMain2.NewRow();
drx["repMasterID"] = Convert.ToInt64(itm1.repMasterID);
drx["ServerName"] = itm2.ServerName.ToString();
drx["DatabaseName"] = itm2.DatabaseName.ToString();
drx["SchemaName"] = itm2.SchemaName.ToString();
drx["TableName"] = itm2.TableName.ToString();
tbMain2.Rows.Add(drx);
}
}
}
try
{
ScriptingOptions scrOptions1, scrOptions2, scrOptions3;
scrOptions1 = new ScriptingOptions() // Drop and Create
{
Default = true,
NoIdentities = true,
DriAll = false,
ScriptDrops = false,
ScriptOwner = false,
NoFileGroup = false,
NoIndexPartitioningSchemes = true,
NoTablePartitioningSchemes = true,
ScriptForCreateDrop = true
};
scrOptions2 = new ScriptingOptions() // Get all keys and indexes
{
Default = false,
DriPrimaryKey = true,
Indexes = true,
ScriptOwner = false,
NoFileGroup = false,
NoIndexPartitioningSchemes = true,
NoTablePartitioningSchemes = true,
ScriptForCreateDrop = true
};
scrOptions3 = new ScriptingOptions() // Get the permissions
{
Default = false,
Permissions = true
};
using (var cnSrc = new SqlConnection(string.Format("Server={0};Database={1};Integrated Security=SSPI;TrustServerCertificate=True;", itm1.FromServer.ToString(), itm1.FromDatabase.ToString())))
{
cnSrc.Open();
Server S1 = new Server(new Microsoft.SqlServer.Management.Common.ServerConnection(cnSrc));
Database D1 = S1.Databases[itm1.FromDatabase.ToString()];
foreach (DataRow dr in tbMain2.Rows)
{
Table T1;
var scrOutput = new StringCollection();
var txtSql = new StringBuilder();
T1 = D1.Tables[dr["TableName"].ToString(), dr["SchemaName"].ToString()];
txtSql.AppendFormat("DROP TABLE IF EXISTS [{0}].[{1}]\nGO\n\n", dr["SchemaName"].ToString(), dr["TableName"].ToString());
txtSql.Append(string.Join("\n", T1.Script(scrOptions1).Cast<string>().ToArray()));
txtSql.AppendLine("\nGO\n");
txtSql.Append(string.Join("\n", T1.Script(scrOptions2).Cast<string>().ToArray()));
txtSql.AppendLine("\nGO\n");
dr["Sql"] = txtSql.ToString();
//
using (var cnDst = new SqlConnection(string.Format("Server={0};Database={1};Integrated Security=SSPI;TrustServerCertificate=True;", itm1.ToServer.ToString(), itm1.ToDatabase.ToString())))
{
string sqlTablePermissions = string.Empty;
try
{
cnDst.Open();
Server S2 = new Server(new Microsoft.SqlServer.Management.Common.ServerConnection(cnDst));
Database D2 = S2.Databases[itm1.ToDatabase.ToString()];
if (D2.Tables.Contains(dr["TableName"].ToString(), dr["SchemaName"].ToString()))
{
Table T2 = D2.Tables[dr["TableName"].ToString(), dr["SchemaName"].ToString()];
sqlTablePermissions = string.Join("\n", T2.Script(scrOptions3).Cast<string>().ToArray());
}
S2.ConnectionContext.BatchSeparator = "GO";
S2.ConnectionContext.BeginTransaction();
S2.ConnectionContext.ExecuteNonQuery(txtSql.ToString());
S2.ConnectionContext.CommitTransaction();
}
catch (Exception ex2)
{
kmCommonLibsCore.ClsErrorReporting.ErrorEncountered(ex2, string.Format("Inside901 - {0}.{1}.{2}.{3}\n\n{4}",
itm1.ToServer.ToString(), itm1.ToDatabase.ToString(), dr["SchemaName"].ToString(), dr["TableName"].ToString(),
Regex.Replace(txtSql.ToString(), "(\r\n|\n\r|\r|\n)", "<br />")));
sqlTablePermissions = string.Empty;
}
using (var t1 = cnDst.BeginTransaction())
using (var bc1 = new SqlBulkCopy(cnDst, SqlBulkCopyOptions.KeepIdentity, t1))
{
bc1.ColumnMappings.Clear();
foreach (Column c1 in T1.Columns)
{
if (c1.Computed == false)
bc1.ColumnMappings.Add(c1.Name, c1.Name);
}
if (bc1.ColumnMappings.Count > 0)
{
using (var cm0 = new SqlCommand(string.Format("Select Count(*) Cnt From [{0}].[{1}]", dr["SchemaName"].ToString(), dr["TableName"].ToString()), cnDst) { CommandTimeout = 0 })
using (var cm1 = new SqlCommand(string.Format("Select * From [{0}].[{1}]", dr["SchemaName"].ToString(), dr["TableName"].ToString()), cnSrc) { CommandTimeout = 0 })
using (var rd1 = cm1.ExecuteReader())
{
bc1.DestinationTableName = string.Format("[{0}].[{1}]", dr["SchemaName"].ToString(), dr["TableName"].ToString());
bc1.BulkCopyTimeout = 21600;
try
{
var sw1 = Stopwatch.StartNew();
bc1.WriteToServer(rd1);
t1.Commit();
sw1.Stop();
dr["Replicated"] = DateTimeOffset.Now;
dr["RowsCopied"] = Convert.ToInt64(cm0.ExecuteScalar());
dr["Elapsed"] = string.Format(@"{0:h\:mm\:ss\.fff}", sw1.Elapsed);
}
catch (Exception ex2)
{
kmCommonLibsCore.ClsErrorReporting.ErrorEncountered(ex2, "Inside905");
t1.Rollback();
}
}
}
}
}
}
using (var t1 = cnRep.BeginTransaction())
using (var bc1 = new SqlBulkCopy(cnRep, SqlBulkCopyOptions.KeepIdentity, t1))
{
bc1.ColumnMappings.Clear();
foreach (DataColumn c1 in tbMain2.Columns)
bc1.ColumnMappings.Add(c1.ColumnName, c1.ColumnName);
bc1.DestinationTableName = "[dbo].[tbMain2]";
bc1.BulkCopyTimeout = 21600;
bc1.WriteToServer(tbMain2);
t1.Commit();
}
}
}
catch (Exception ex1)
{
kmCommonLibsCore.ClsErrorReporting.ErrorEncountered(ex1, "Outside300");
}
AdvanceToNextTime(itm1);
}
}
}
catch (Exception ex)
{
kmCommonLibsCore.ClsErrorReporting.ErrorEncountered(ex);
}
}
private void AdvanceToNextTime(dynamic itm)
{
try
{
string IntervalTypePattern = "^(day|hour|minute|week|month)";
string IntervalType = string.Empty;
string IntervalNumberPattern = @"([0-9.]{1,})+";
double IntervalNumber = 0;
if (Regex.IsMatch(itm.Frequency, IntervalTypePattern))
{
IntervalType = Regex.Match(itm.Frequency, IntervalTypePattern).Value.ToString().ToLower();
}
var m = Regex.Match(itm.Frequency, IntervalNumberPattern);
foreach (Capture mg in m.Groups[1].Captures)
{
if (double.TryParse(mg.Value, out IntervalNumber))
break;
}
if (!string.IsNullOrWhiteSpace(IntervalType) && IntervalNumber > 0)
{
var dtNextExecution = (DateTimeOffset)itm.dtNextExecution;
while (dtNextExecution < DateTimeOffset.Now)
{
switch (IntervalType)
{
case "day":
case "days":
dtNextExecution=dtNextExecution.AddDays(IntervalNumber); break;
case "hour":
case "hours":
dtNextExecution = dtNextExecution.AddHours(IntervalNumber); break;
case "minute":
case "minutes":
dtNextExecution = dtNextExecution.AddMinutes(IntervalNumber); break;
case "week":
case "weeks":
dtNextExecution = dtNextExecution.AddDays(IntervalNumber * 7f); break;
case "month":
case "months":
dtNextExecution = dtNextExecution.AddMonths(Convert.ToInt32(Math.Truncate(IntervalNumber))); break;
default:
dtNextExecution = DateTimeOffset.Now.AddMinutes(30); break;
}
}
using (var cn = new SqlConnection(kmCommonLibsCore.Constants.cnSysReplication))
{
cn.Execute("Update [dbo].[tbMain1] Set [dtNextExecution]=@dtNextExecution, [dtLastExecution]=SysDateTimeOffset() Where [repMasterID]=@repMasterID",
new
{
repMasterID = Convert.ToInt64(itm.repMasterID),
dtNextExecution = dtNextExecution
});
}
}
}
catch (Exception ex)
{
kmCommonLibsCore.ClsErrorReporting.ErrorEncountered(ex);
}
}
private DataTable InitWorkTable()
{
var dt = new DataTable();
dt.Columns.Add("repMasterID", typeof(Int64));
dt.Columns.Add("ServerName", typeof(String));
dt.Columns.Add("DatabaseName", typeof(String));
dt.Columns.Add("SchemaName", typeof(String));
dt.Columns.Add("TableName", typeof(String));
dt.Columns.Add("RowsCopied", typeof(Int64));
dt.Columns.Add("Sql", typeof(String));
dt.Columns.Add("Replicated", typeof(DateTimeOffset));
dt.Columns.Add("Elapsed", typeof(String));
return dt;
}
}