发布:2023/12/7 15:43:04作者:大数据 来源:大数据 浏览次数:594
前言
C#操作MySQL大量数据最常见的操作便是 select 读取数据,然后在C#中对数据进行处理, 完毕后再插入数据库中。 简而言之就 select -> process -> insert 三个步骤。 对于数据量小的情况下(百万级别 or 几百兆)可能
最多1个小时就处理完了。但是对于千万级数据可能几天,甚至更多。 那么问题来了,如何优化??
第一步 解决读取的问题
跟数据库打交道的方式有很多,我来列举下吧:
1. 【重武器-坦克大炮】使用重型ORM框架,比如 EF,NHibernat 这样的框架。
2. 【轻武器-AK47】 使用 Dapper,PetaPoco 之类,单cs文件。灵活高效,使用简单。居家越货必备(我更喜欢PetaPoco :))
3. 【冷兵器?匕首?】使用原生的Connection、Command。 然后写原生的SQL语句。。
分析:
【重武器】在我们这里肯定直接被PASS, 他们应该被用在大型项目中。
【轻武器】 Dapper,PetaPoco 看过源码你会发现用到了反射,虽然使用 IL和缓存技术 ,但是还是会影响读取效率,PASS
好吧那就只有使用匕首, 原生SQL 走起, 利用 DataReader 进行高效读取,并且使用 索引 取数据(更快),而不是列名。
大概的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
using (var conn = new MySqlConnection( "Connection String..." )) { conn.Open(); //此处设置读取的超时,不然在海量数据时很容易超时 var c = new MySqlCommand( "set net_write_timeout=9999999; set net_read_timeout=9999999" , conn); c.ExecuteNonQuery(); MySqlCommand rcmd = new MySqlCommand(); rcmd.Connection = conn; rcmd.CommandText = @"SELECT `f1`,`f2` FROM `table1`" ; //设置命令的执行超时 rcmd.CommandTimeout = 99999999; var myData = rcmd.ExecuteReader(); while (myData.Read()) { var f1= myData.GetInt32(0); var f2= myData.GetString(1); //这里做数据处理.... } } |
哈哈,怎么样,代码非常原始,还是使用索引来取数据,很容易出错。 当然一切为了性能咱都忍了
第二步 数据处理
其实这一步,根据你的业务需要,代码肯定不一, 不过无非是一些 字符串处理 , 类型转换 的操作,这时候就是考验你的C#基础功底的时候了。 以及如何高效编写正则表达式。。。
具体代码也没法写啊 ,先看完 CLR via C# 在来跟我讨论吧 ,O(∩_∩)O哈哈哈~ 跳过。。。。
第三部 数据插入
如何批量插入才最高效呢? 有同学会说, 使用 事务 啊,BeginTransaction, 然后EndTransaction。 恩,这个的确可以提高插入效率。 但是还有更加高效的方法,那就是合并insert语句。
那么怎么合并呢?
1
|
insert into table (f1,f2) values(1, 'sss' ),(2, 'bbbb' ),(3, 'cccc' ); |
就是把values后面的全部用逗号,链接起来,然后一次性执行 。
当然不能一次性提交个100MB的SQL执行,MySQL服务器对每次执行命令的长度是有限制的。 通过 MySQL服务器端的 max_allowed_packet 属性可以查看, 默认是 1MB
咱们来看看伪代码吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
//使用StringBuilder高效拼接字符串 var sqlBuilder = new StringBuilder(); //添加insert 语句的头 string sqlHeader = "insert into table1 (`f1`,`f2`) values" ; sqlBuilder.Append(sqlHeader); using (var conn = new MySqlConnection( "Connection String..." )) { conn.Open(); //此处设置读取的超时,不然在海量数据时很容易超时 var c = new MySqlCommand( "set net_write_timeout=9999999; set net_read_timeout=9999999" , conn); c.ExecuteNonQuery(); MySqlCommand rcmd = new MySqlCommand(); rcmd.Connection = conn; rcmd.CommandText = @"SELECT `f1`,`f2` FROM `table1`" ; //设置命令的执行超时 rcmd.CommandTimeout = 99999999; var myData = rcmd.ExecuteReader(); while (myData.Read()) { var f1 = myData.GetInt32(0); var f2 = myData.GetString(1); //这里做数据处理.... sqlBuilder.AppendFormat( "({0},'{1}')," , f1,AddSlash(f2)); if (sqlBuilder.Length >= 1024 * 1024 * 1024) //当然这里的1MB length的字符串并不等于 1MB的Packet。。。我知道:) { insertCmd.Execute(sqlBuilder.Remove(sqlBuilder.Length-1,1).ToString()) //移除逗号,然后执行 sqlBuilder.Clear(); //清空 sqlBuilder.Append(sqlHeader); //在加上insert 头 } } } |
好了,到这里 大概的优化后的高效查询、插入就完成了。
总结
总结下来,无非2个关键技术点, DataReader、SQL合并, 都是一些老的技术啦。其实,上面的代码只能称得上高效, 但是, 却非常的不优雅。以上就是这篇文章的全部内容了,希望本文的内容对大家能有所帮助,如果有疑问大家可以留言交流。
在SQL Server 中插入一条数据使用Insert语句,但是如果想要批量插入一堆数据的话,循环使用Insert不仅效率低,而且会导致SQL一系统性能问题。下面介绍SQL Server支持的两种批量数据插入方法:Bulk和表值参数(Table-Valued Parameters)。
C#实现SQL批量插入数据到表的方法。分享给大家供大家参考,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
#region 帮助实例:SQL 批量插入数据 多种方法 /// <summary> /// SqlBulkCopy往数据库中批量插入数据 /// </summary> /// <param name="sourceDataTable">数据源表</param> /// <param name="targetTableName">服务器上目标表</param> /// <param name="mapping">创建新的列映射,并使用列序号引用源列和目标列的列名称。</param> public static void BulkToDB(DataTable sourceDataTable, string targetTableName, SqlBulkCopyColumnMapping[] mapping) { /* 调用方法 -2012年11月16日编写 //DataTable dt = Get_All_RoomState_ByHID(); //SqlBulkCopyColumnMapping[] mapping = new SqlBulkCopyColumnMapping[4]; //mapping[0] = new SqlBulkCopyColumnMapping("Xing_H_ID", "Xing_H_ID"); //mapping[1] = new SqlBulkCopyColumnMapping("H_Name", "H_Name"); //mapping[2] = new SqlBulkCopyColumnMapping("H_sName", "H_sName"); //mapping[3] = new SqlBulkCopyColumnMapping("H_eName", "H_eName"); //BulkToDB(dt, "Bak_Tts_Hotel_Name", mapping); */ SqlConnection conn = new SqlConnection(SQLHelper.ConnectionString); SqlBulkCopy bulkCopy = new SqlBulkCopy(conn); //用其它源的数据有效批量加载sql server表中 bulkCopy.DestinationTableName = targetTableName; //服务器上目标表的名称 bulkCopy.BatchSize = sourceDataTable.Rows.Count; //每一批次中的行数 try { conn.Open(); if (sourceDataTable != null && sourceDataTable.Rows.Count != 0) { for (int i = 0; i < mapping.Length; i++) bulkCopy.ColumnMappings.Add(mapping[i]); //将提供的数据源中的所有行复制到目标表中 bulkCopy.WriteToServer(sourceDataTable ); } } catch (Exception ex) { //throw ex; Common.WriteTextLog("BulkToDB", ex.Message); } finally { conn.Close(); if (bulkCopy != null) bulkCopy.Close(); } } /// <summary> /// SQL2008以上方可支持自定义表类型 :调用存储过程游标-往数据库中批量插入数据 ,注意 /// </summary> /// <param name="sourceDataTable"></param> public void DataTableToHotelDB(DataTable sourceDataTable) { /* -2012年11月15日编写 ALTER PROCEDURE [dbo].[P_InsertSubject] @tempStudentID int AS DECLARE rs CURSOR LOCAL SCROLL FOR select H_ID from Tts_Hotel_Name OPEN rs FETCH NEXT FROM rs INTO @tempStudentID WHILE @@FETCH_STATUS = 0 BEGIN Insert student (tempStudentID) values (@tempStudentID) FETCH NEXT FROM rs INTO @tempStudentID END CLOSE rs * *************************************************************** * create table Orders ( Orders_ID int identity(1,1) primary key, ItemCode nvarchar(50) not null, UM nvarchar(20) not null, Quantity decimal(18,6) not null, UnitPrice decimal(18,6) not null ) --创建用户自定义表类型,在可编程性->类型性->用户自定义表类型 create type OrdersTableType as table ( ItemCode nvarchar(50) not null, UM nvarchar(20) not null, Quantity decimal(18,6) not null, UnitPrice decimal(18,6) not null ) go create procedure Pro_Orders ( @OrdersCollection OrdersTableType readonly ) as insert into Orders([ItemCode],[UM],[Quantity],[UnitPrice]) SELECT oc.[ItemCode],oc.[UM],[Quantity],oc.[UnitPrice] FROM @OrdersCollection AS oc; go * */ SqlParameter[] parameters = {new SqlParameter("@OrdersCollection", SqlDbType.Structured)}; parameters[0].Value = sourceDataTable; new SQLHelper().ExecuteScalar("P_DataTable_ToHotelDB", parameters, true); } #endregion |
C#中有时候需要将内存中的数据批量插入到数据库表中,使用for循环进行批量插入不但耗时而且会频繁操作数据库。
针对数据量很少的可以使用for循环插入,但是针对于数据量大的则不推荐使用for循环插入,推荐使用sql的块处理插入。
块处理不但耗时少而且不会频繁对数据库进行操作,只是需要注意的一点是DataTable中的列必须与表的列完全一致。
如下代码是批量插入的一个函数,自测可用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
#region 使用SqlBulkCopy将DataTable中的数据批量插入数据库中 /// <summary> /// 注意:DataTable中的列需要与数据库表中的列完全一致。 /// 已自测可用。 /// </summary> /// <param name="conStr">数据库连接串</param> /// <param name="strTableName">数据库中对应的表名</param> /// <param name="dtData">数据集</param> public static void SqlBulkCopyInsert(string conStr, string strTableName, DataTable dtData) { try { using (SqlBulkCopy sqlRevdBulkCopy = new SqlBulkCopy(conStr)) //引用SqlBulkCopy { sqlRevdBulkCopy.DestinationTableName = strTableName; //数据库中对应的表名 sqlRevdBulkCopy.NotifyAfter = dtData.Rows.Count; //有几行数据 sqlRevdBulkCopy.WriteToServer(dtData); //数据导入数据库 sqlRevdBulkCopy.Close(); //关闭连接 } } catch (Exception ex) { throw (ex); } } #endregion |
C#批量插入数据到Sqlserver中的四种方式
我的新书ASP.NET MVC企业级实战预计明年2月份出版,感谢大家关注!
本篇,我将来讲解一下在Sqlserver中批量插入数据。
先创建一个用来测试的数据库和表,为了让插入数据更快,表中主键采用的是GUID,表中没有创建任何索引。GUID必然是比自增长要快的,因为你生成一个GUID算法所花的时间肯定比你从数据表中重新查询上一条记录的ID的值然后再进行加1运算要少。而如果存在索引的情况下,每次插入记录都会进行索引重建,这是非常耗性能的。如果表中无可避免的存在索引,我们可以通过先删除索引,然后批量插入,最后再重建索引的方式来提高效率。
create database CarSYS;
go
use CarSYS;
go
CREATE TABLE Product(
Id UNIQUEIDENTIFIER PRIMARY KEY,
NAME VARCHAR(50) NOT NULL,
Price DECIMAL(18,2) NOT NULL
)
我们通过SQL脚本来插入数据,常见如下四种方式。
方式一:一条一条插入,性能最差,不建议使用。
INSERT INTO Product(Id,Name,Price) VALUES(newid(),'牛栏1段',160);
INSERT INTO Product(Id,Name,Price) VALUES(newid(),'牛栏2段',260);
......
方式二:insert bulk
语法如下:
BULK INSERT [ [ 'database_name'.][ 'owner' ].]{ 'table_name' FROM 'data_file' }
WITH (
[ BATCHSIZE [ = batch_size ] ],
[ CHECK_CONSTRAINTS ],
[ CODEPAGE [ = 'ACP' | 'OEM' | 'RAW' | 'code_page' ] ],
[ DATAFILETYPE [ = 'char' | 'native'| 'widechar' | 'widenative' ] ],
[ FIELDTERMINATOR [ = 'field_terminator' ] ],
[ FIRSTROW [ = first_row ] ],
[ FIRE_TRIGGERS ],
[ FORMATFILE = 'format_file_path' ],
[ KEEPIDENTITY ],
[ KEEPNULLS ],
[ KILOBYTES_PER_BATCH [ = kilobytes_per_batch ] ],
[ LASTROW [ = last_row ] ],
[ MAXERRORS [ = max_errors ] ],
[ ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ],
[ ROWS_PER_BATCH [ = rows_per_batch ] ],
[ ROWTERMINATOR [ = 'row_terminator' ] ],
[ TABLOCK ],
)
相关参数说明:
BULK INSERT
[ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]
FROM 'data_file'
[ WITH
(
[ [ , ] BATCHSIZE = batch_size ] --BATCHSIZE指令来设置在单个事务中可以插入到表中的记录的数量
[ [ , ] CHECK_CONSTRAINTS ] --指定在大容量导入操作期间,必须检查所有对目标表或视图的约束。若没有 CHECK_CONSTRAINTS 选项,则所有 CHECK 和 FOREIGN KEY 约束都将被忽略,并且在此操作之后表的约束将标记为不可信。
[ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ] --指定该数据文件中数据的代码页
[ [ , ] DATAFILETYPE =
{ 'char' | 'native'| 'widechar' | 'widenative' } ] --指定 BULK INSERT 使用指定的数据文件类型值执行导入操作。
[ [ , ] FIELDTERMINATOR = 'field_terminator' ] --标识分隔内容的符号
[ [ , ] FIRSTROW = first_row ] --指定要加载的第一行的行号。默认值是指定数据文件中的第一行
[ [ , ] FIRE_TRIGGERS ] --是否启动触发器
[ [ , ] FORMATFILE = 'format_file_path' ]
[ [ , ] KEEPIDENTITY ] --指定导入数据文件中的标识值用于标识列
[ [ , ] KEEPNULLS ] --指定在大容量导入操作期间空列应保留一个空值,而不插入用于列的任何默认值
[ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]
[ [ , ] LASTROW = last_row ] --指定要加载的最后一行的行号
[ [ , ] MAXERRORS = max_errors ] --指定允许在数据中出现的最多语法错误数,超过该数量后将取消大容量导入操作。
[ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ] --指定数据文件中的数据如何排序
[ [ , ] ROWS_PER_BATCH = rows_per_batch ]
[ [ , ] ROWTERMINATOR = 'row_terminator' ] --标识分隔行的符号
[ [ , ] TABLOCK ] --指定为大容量导入操作持续时间获取一个表级锁
[ [ , ] ERRORFILE = 'file_name' ] --指定用于收集格式有误且不能转换为 OLE DB 行集的行的文件。
)]
方式三:INSERT INTO xx select...
INSERT INTO Product(Id,Name,Price)
SELECT NEWID(),'牛栏1段',160
UNION ALL
SELECT NEWID(),'牛栏2段',180
UNION ALL
......
方式四:拼接SQL
INSERT INTO Product(Id,Name,Price) VALUES
(newid(),'牛栏1段',160)
,(newid(),'牛栏2段',260)
......
在C#中通过ADO.NET来实现批量操作存在四种与之对应的方式。
方式一:逐条插入
#region 方式一
static void InsertOne()
{
Console.WriteLine("采用一条一条插入的方式实现");
Stopwatch sw = new Stopwatch();
using (SqlConnection conn = new SqlConnection(StrConnMsg)) //using中会自动Open和Close 连接。
{
string sql = "INSERT INTO Product(Id,Name,Price) VALUES(newid(),@p,@d)";
conn.Open();
for (int i = 0; i < totalRow; i++)
{
using (SqlCommand cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.AddWithValue("@p", "商品" + i);
cmd.Parameters.AddWithValue("@d", i);
sw.Start();
cmd.ExecuteNonQuery();
Console.WriteLine(string.Format("插入一条记录,已耗时{0}毫秒", sw.ElapsedMilliseconds));
}
if (i == getRow)
{
sw.Stop();
break;
}
}
}
Console.WriteLine(string.Format("插入{0}条记录,每{4}条的插入时间是{1}毫秒,预估总得插入时间是{2}毫秒,{3}分钟",
totalRow, sw.ElapsedMilliseconds, ((sw.ElapsedMilliseconds / getRow) * totalRow), GetMinute((sw.ElapsedMilliseconds / getRow * totalRow)), getRow));
}
static int GetMinute(long l)
{
return (Int32)l / 60000;
}
#endregion
运行结果如下:
我们会发现插入100w条记录,预计需要50分钟时间,每插入一条记录大概需要3毫秒左右。
方式二:使用SqlBulk
#region 方式二
static void InsertTwo()
{
Console.WriteLine("使用Bulk插入的实现方式");
Stopwatch sw = new Stopwatch();
DataTable dt = GetTableSchema();
using (SqlConnection conn = new SqlConnection(StrConnMsg))
{
SqlBulkCopy bulkCopy = new SqlBulkCopy(conn);
bulkCopy.DestinationTableName = "Product";
bulkCopy.BatchSize = dt.Rows.Count;
conn.Open();
sw.Start();
for (int i = 0; i < totalRow;i++ )
{
DataRow dr = dt.NewRow();
dr[0] = Guid.NewGuid();
dr[1] = string.Format("商品", i);
dr[2] = (decimal)i;
dt.Rows.Add(dr);
}
if (dt != null && dt.Rows.Count != 0)
{
bulkCopy.WriteToServer(dt);
sw.Stop();
}
Console.WriteLine(string.Format("插入{0}条记录共花费{1}毫秒,{2}分钟", totalRow, sw.ElapsedMilliseconds, GetMinute(sw.ElapsedMilliseconds)));
}
}
static DataTable GetTableSchema()
{
DataTable dt = new DataTable();
dt.Columns.AddRange(new DataColumn[] {
new DataColumn("Id",typeof(Guid)),
new DataColumn("Name",typeof(string)),
new DataColumn("Price",typeof(decimal))});
return dt;
}
#endregion
运行结果如下:
插入100w条记录才8s多,是不是很溜。
打开Sqlserver Profiler跟踪,会发现执行的是如下语句:
insert bulk Product ([Id] UniqueIdentifier, [NAME] VarChar(50) COLLATE Chinese_PRC_CI_AS, [Price] Decimal(18,2))
方式三:使用TVPs(表值参数)插入数据
从sqlserver 2008起开始支持TVPs。创建缓存表ProductTemp ,执行如下SQL。
CREATE TYPE ProductTemp AS TABLE(
Id UNIQUEIDENTIFIER PRIMARY KEY,
NAME VARCHAR(50) NOT NULL,
Price DECIMAL(18,2) NOT NULL
)
执行完成之后,会发现在数据库CarSYS下面多了一张缓存表ProductTemp
可见插入100w条记录共花费了11秒多。
方式四:拼接SQL
此种方法在C#中有限制,一次性只能批量插入1000条,所以就得分段进行插入。
#region 方式四
static void InsertFour()
{
Console.WriteLine("采用拼接批量SQL插入的方式实现");
Stopwatch sw = new Stopwatch();
using (SqlConnection conn = new SqlConnection(StrConnMsg)) //using中会自动Open和Close 连接。
{
conn.Open();
sw.Start();
for (int j = 0; j < totalRow / getRow;j++ )
{
StringBuilder sb = new StringBuilder();
sb.Append("INSERT INTO Product(Id,Name,Price) VALUES");
using (SqlCommand cmd = new SqlCommand())
{
for (int i = 0; i < getRow; i++)
{
sb.AppendFormat("(newid(),'商品{0}',{0}),", j*i+i);
}
cmd.Connection = conn;
cmd.CommandText = sb.ToString().TrimEnd(',');
cmd.ExecuteNonQuery();
}
}
sw.Stop();
Console.WriteLine(string.Format("插入{0}条记录,共耗时{1}毫秒",totalRow,sw.ElapsedMilliseconds));
}
}
#endregion
运行结果如下:
我们可以看到大概花费了10分钟。虽然在方式一的基础上,性能有了较大的提升,但是显然还是不够快。
总结:大数据批量插入方式一和方式四尽量避免使用,而方式二和方式三都是非常高效的批量插入数据方式。其都是通过构建DataTable的方式插入的,而我们知道DataTable是存在内存中的,所以当数据量特别特别大,大到内存中无法一次性存储的时候,可以分段插入。比如需要插入9千万条数据,可以分成9段进行插入,一次插入1千万条。而在for循环中直接进行数据库操作,我们是应该尽量避免的。每一次数据库的连接、打开和关闭都是比较耗时的,虽然在C#中存在数据库连接池,也就是当我们使用using或者conn.Close(),进行释放连接时,其实并没有真正关闭数据库连接,它只是让连接以类似于休眠的方式存在,当再次操作的时候,会从连接池中找一个休眠状态的连接,唤醒它,这样可以有效的提高并发能力,减少连接损耗。而连接池中的连接数,我们都是可以配置的。
源码下载:http://pan.baidu.com/s/1slm1wPv
https://www.cnblogs.com/jiekzou/p/6145550.html
© Copyright 2014 - 2024 柏港建站平台 ejk5.com. 渝ICP备16000791号-4