2 // System.Data.SqlClient.SqlBulkCopy.cs
5 // Nagappan A (anagappan@novell.com)
7 // (C) Novell, Inc 2007
10 // Copyright (C) 2007 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 using System.Data.Common;
35 using System.Threading;
36 using System.Threading.Tasks;
38 using Mono.Data.Tds.Protocol;
40 namespace System.Data.SqlClient {
41 /// <summary>Efficient way to bulk load SQL Server table with several data rows at once</summary>
42 public sealed class SqlBulkCopy : IDisposable
45 private const string transConflictMessage = "Must not specify SqlBulkCopyOptions.UseInternalTransaction " +
46 "and pass an external Transaction at the same time.";
48 private const SqlBulkCopyOptions insertModifiers =
49 SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.TableLock |
50 SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.FireTriggers;
55 private int _batchSize = 0;
56 private int _notifyAfter = 0;
57 private int _bulkCopyTimeout = 0;
58 private SqlBulkCopyColumnMappingCollection _columnMappingCollection = new SqlBulkCopyColumnMappingCollection ();
59 private string _destinationTableName = null;
60 private bool ordinalMapping = false;
61 private bool sqlRowsCopied = false;
62 private bool isLocalConnection = false;
63 private SqlConnection connection;
64 private SqlTransaction externalTransaction;
65 private SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default;
70 public SqlBulkCopy (SqlConnection connection)
72 if (connection == null) {
73 throw new ArgumentNullException("connection");
76 this.connection = connection;
79 public SqlBulkCopy (string connectionString)
81 if (connectionString == null) {
82 throw new ArgumentNullException("connectionString");
85 this.connection = new SqlConnection (connectionString);
86 isLocalConnection = true;
90 public SqlBulkCopy (string connectionString, SqlBulkCopyOptions copyOptions)
92 if (connectionString == null) {
93 throw new ArgumentNullException ("connectionString");
96 this.connection = new SqlConnection (connectionString);
97 isLocalConnection = true;
99 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction)
100 throw new NotImplementedException ("We don't know how to process UseInternalTransaction option.");
102 this.copyOptions = copyOptions;
106 public SqlBulkCopy (SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
108 if (connection == null) {
109 throw new ArgumentNullException ("connection");
112 this.connection = connection;
113 this.copyOptions = copyOptions;
115 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction) {
116 if (externalTransaction != null)
117 throw new ArgumentException (transConflictMessage);
120 this.externalTransaction = externalTransaction;
122 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction)
123 throw new NotImplementedException ("We don't know how to process UseInternalTransaction option.");
125 this.copyOptions = copyOptions;
132 public int BatchSize {
133 get { return _batchSize; }
134 set { _batchSize = value; }
137 public int BulkCopyTimeout {
138 get { return _bulkCopyTimeout; }
139 set { _bulkCopyTimeout = value; }
142 public SqlBulkCopyColumnMappingCollection ColumnMappings {
143 get { return _columnMappingCollection; }
146 public string DestinationTableName {
147 get { return _destinationTableName; }
148 set { _destinationTableName = value; }
152 public bool EnableStreaming {
153 get { throw new NotImplementedException (); }
154 set { throw new NotImplementedException (); }
157 public int NotifyAfter {
158 get { return _notifyAfter; }
161 throw new ArgumentOutOfRangeException ("NotifyAfter should be greater than or equal to 0");
162 _notifyAfter = value;
172 if (sqlRowsCopied == true) {
173 throw new InvalidOperationException ("Close should not be called from SqlRowsCopied event");
175 if (connection == null || connection.State == ConnectionState.Closed) {
181 private DataTable [] GetColumnMetaData ()
183 DataTable [] columnMetaDataTables = new DataTable [2];
184 SqlCommand cmd = new SqlCommand ("select @@trancount; " +
185 "set fmtonly on select * from " +
186 DestinationTableName + " set fmtonly off;" +
187 "exec sp_tablecollations_90 '" +
188 DestinationTableName + "'",
191 if (externalTransaction != null)
192 cmd.Transaction = externalTransaction;
194 SqlDataReader reader = cmd.ExecuteReader ();
195 int i = 0; // Skipping 1st result
198 columnMetaDataTables [i - 1] = reader.GetSchemaTable ();
200 SqlDataAdapter adapter = new SqlDataAdapter ();
201 adapter.MissingSchemaAction = MissingSchemaAction.AddWithKey;
202 columnMetaDataTables [i - 1] = new DataTable (DestinationTableName);
203 adapter.FillInternal (columnMetaDataTables [i - 1], reader);
206 } while (reader.IsClosed == false && reader.NextResult());
208 return columnMetaDataTables;
211 private string GenerateColumnMetaData (SqlCommand tmpCmd, DataTable colMetaData, DataTable tableCollations)
214 string statement = "";
216 foreach (DataRow row in colMetaData.Rows) {
218 foreach (DataColumn col in colMetaData.Columns) { // FIXME: This line not required, remove later
220 if (_columnMappingCollection.Count > 0) {
221 if (ordinalMapping) {
222 foreach (SqlBulkCopyColumnMapping mapping
223 in _columnMappingCollection) {
224 if (mapping.DestinationOrdinal == i) {
230 foreach (SqlBulkCopyColumnMapping mapping
231 in _columnMappingCollection) {
232 if (mapping.DestinationColumn == (string) row ["ColumnName"]) {
241 if ((bool)row ["IsReadOnly"]) {
247 SqlParameter param = new SqlParameter ((string) row ["ColumnName"],
248 ((SqlDbType) row ["ProviderType"]));
250 if ((int)row ["ColumnSize"] != -1) {
251 param.Size = (int) row ["ColumnSize"];
254 short numericPresision = (short)row ["NumericPrecision"];
255 if (numericPresision != 255) {
256 param.Precision = (byte) numericPresision;
259 short numericScale = (short)row ["NumericScale"];
260 if (numericScale != 255) {
261 param.Scale = (byte) numericScale;
264 param.IsNullable = (bool)row ["AllowDBNull"];
265 tmpCmd.Parameters.Add (param);
271 bool insertSt = false;
272 foreach (DataRow row in colMetaData.Rows) {
273 SqlDbType sqlType = (SqlDbType) row ["ProviderType"];
274 if (_columnMappingCollection.Count > 0) {
277 foreach (SqlParameter param in tmpCmd.Parameters) {
278 if (ordinalMapping) {
279 foreach (SqlBulkCopyColumnMapping mapping
280 in _columnMappingCollection) {
281 if (mapping.DestinationOrdinal == i && param.Value == null) {
286 foreach (SqlBulkCopyColumnMapping mapping
287 in _columnMappingCollection) {
288 if (mapping.DestinationColumn == param.ParameterName &&
289 (string)row ["ColumnName"] == param.ParameterName) {
296 if (insertSt == true)
299 if (insertSt == false)
302 if ((bool)row ["IsReadOnly"]) {
306 int columnSize = (int)row ["ColumnSize"];
307 string columnInfo = "";
309 if (columnSize >= TdsMetaParameter.maxVarCharCharacters && sqlType == SqlDbType.Text)
310 columnInfo = "VarChar(max)";
311 else if (columnSize >= TdsMetaParameter.maxNVarCharCharacters && sqlType == SqlDbType.NText)
312 columnInfo = "NVarChar(max)";
313 else if (IsTextType(sqlType) && columnSize != -1) {
314 columnInfo = string.Format ("{0}({1})",
316 columnSize.ToString());
318 columnInfo = string.Format ("{0}", sqlType);
321 if ( sqlType == SqlDbType.Decimal)
322 columnInfo += String.Format("({0},{1})", row ["NumericPrecision"], row ["NumericScale"]);
326 string columnName = (string) row ["ColumnName"];
327 statement += string.Format ("[{0}] {1}", columnName, columnInfo);
330 if (IsTextType(sqlType) && tableCollations != null) {
331 foreach (DataRow collationRow in tableCollations.Rows) {
332 if ((string)collationRow ["name"] == columnName) {
333 statement += string.Format (" COLLATE {0}", collationRow ["collation"]);
342 private void ValidateColumnMapping (DataTable table, DataTable tableCollations)
344 // So the problem here is that temp tables will not have any table collations. This prevents
345 // us from bulk inserting into temp tables. So for now we will skip the validation and
346 // let SqlServer tell us there is an issue rather than trying to do it here.
347 // So for now we will simply return and do nothing.
348 // TODO: At some point we should remove this function if we all agree its the right thing to do
351 // foreach (SqlBulkCopyColumnMapping _columnMapping in _columnMappingCollection) {
352 // if (ordinalMapping == false &&
353 // (_columnMapping.DestinationColumn == String.Empty ||
354 // _columnMapping.SourceColumn == String.Empty))
355 // throw new InvalidOperationException ("Mappings must be either all null or ordinal");
356 // if (ordinalMapping &&
357 // (_columnMapping.DestinationOrdinal == -1 ||
358 // _columnMapping.SourceOrdinal == -1))
359 // throw new InvalidOperationException ("Mappings must be either all null or ordinal");
360 // bool flag = false;
361 // if (ordinalMapping == false) {
362 // foreach (DataRow row in tableCollations.Rows) {
363 // if ((string)row ["name"] == _columnMapping.DestinationColumn) {
368 // if (flag == false)
369 // throw new InvalidOperationException ("ColumnMapping does not match");
371 // foreach (DataColumn col in table.Columns) {
372 // if (col.ColumnName == _columnMapping.SourceColumn) {
377 // if (flag == false)
378 // throw new InvalidOperationException ("ColumnName " +
379 // _columnMapping.SourceColumn +
380 // " does not match");
382 // if (_columnMapping.DestinationOrdinal >= tableCollations.Rows.Count)
383 // throw new InvalidOperationException ("ColumnMapping does not match");
388 private void BulkCopyToServer (DataTable table, DataRowState state)
390 if (connection == null || connection.State == ConnectionState.Closed)
391 throw new InvalidOperationException ("This method should not be called on a closed connection");
392 if (_destinationTableName == null)
393 throw new ArgumentNullException ("DestinationTableName");
394 if (isLocalConnection && connection.State != ConnectionState.Open)
397 if ((copyOptions & SqlBulkCopyOptions.KeepIdentity) == SqlBulkCopyOptions.KeepIdentity) {
398 SqlCommand cmd = new SqlCommand ("set identity_insert " +
399 table.TableName + " on",
401 cmd.ExecuteScalar ();
403 DataTable [] columnMetaDataTables = GetColumnMetaData ();
404 DataTable colMetaData = columnMetaDataTables [0];
405 DataTable tableCollations = columnMetaDataTables [1];
407 if (_columnMappingCollection.Count > 0) {
408 if (_columnMappingCollection [0].SourceOrdinal != -1)
409 ordinalMapping = true;
410 ValidateColumnMapping (table, tableCollations);
413 SqlCommand tmpCmd = new SqlCommand ();
414 TdsBulkCopy blkCopy = new TdsBulkCopy ((Tds)connection.Tds);
415 if (((Tds)connection.Tds).TdsVersion >= TdsVersion.tds70) {
416 string statement = "insert bulk " + DestinationTableName + " (";
417 statement += GenerateColumnMetaData (tmpCmd, colMetaData, tableCollations);
420 #region Check requested options and add corresponding modifiers to the statement
421 if ((copyOptions & insertModifiers) != SqlBulkCopyOptions.Default) {
422 statement += " WITH (";
423 bool commaRequired = false;
425 if ((copyOptions & SqlBulkCopyOptions.CheckConstraints) == SqlBulkCopyOptions.CheckConstraints) {
428 statement += "CHECK_CONSTRAINTS";
429 commaRequired = true;
432 if ((copyOptions & SqlBulkCopyOptions.TableLock) == SqlBulkCopyOptions.TableLock) {
435 statement += "TABLOCK";
436 commaRequired = true;
439 if ((copyOptions & SqlBulkCopyOptions.KeepNulls) == SqlBulkCopyOptions.KeepNulls) {
442 statement += "KEEP_NULLS";
443 commaRequired = true;
446 if ((copyOptions & SqlBulkCopyOptions.FireTriggers) == SqlBulkCopyOptions.FireTriggers) {
449 statement += "FIRE_TRIGGERS";
450 commaRequired = true;
455 #endregion Check requested options and add corresponding modifiers to the statement
457 blkCopy.SendColumnMetaData (statement);
459 blkCopy.BulkCopyStart (tmpCmd.Parameters.MetaParameters);
460 long noRowsCopied = 0;
461 foreach (DataRow row in table.Rows) {
462 if (row.RowState == DataRowState.Deleted)
463 continue; // Don't copy the row that's in deleted state
464 if (state != 0 && row.RowState != state)
466 bool isNewRow = true;
468 foreach (SqlParameter param in tmpCmd.Parameters) {
470 object rowToCopy = null;
471 if (_columnMappingCollection.Count > 0) {
472 if (ordinalMapping) {
473 foreach (SqlBulkCopyColumnMapping mapping
474 in _columnMappingCollection) {
475 if (mapping.DestinationOrdinal == i && param.Value == null) {
476 rowToCopy = row [mapping.SourceOrdinal];
477 SqlParameter parameter = new SqlParameter (mapping.SourceOrdinal.ToString (),
479 if (param.MetaParameter.TypeName != parameter.MetaParameter.TypeName) {
480 parameter.SqlDbType = param.SqlDbType;
481 rowToCopy = parameter.Value = parameter.ConvertToFrameworkType (rowToCopy);
483 string colType = string.Format ("{0}", parameter.MetaParameter.TypeName);
484 if (colType == "nvarchar" || colType == "ntext" || colType == "nchar") {
485 if (row [i] != null && row [i] != DBNull.Value) {
486 size = ((string) parameter.Value).Length;
489 } else if (colType == "varchar" || colType == "text" || colType == "char") {
490 if (row [i] != null && row [i] != DBNull.Value)
491 size = ((string) parameter.Value).Length;
493 size = parameter.Size;
499 foreach (SqlBulkCopyColumnMapping mapping
500 in _columnMappingCollection) {
501 if (mapping.DestinationColumn == param.ParameterName) {
502 rowToCopy = row [mapping.SourceColumn];
503 SqlParameter parameter = new SqlParameter (mapping.SourceColumn, rowToCopy);
504 if (param.MetaParameter.TypeName != parameter.MetaParameter.TypeName) {
505 parameter.SqlDbType = param.SqlDbType;
506 rowToCopy = parameter.Value = parameter.ConvertToFrameworkType (rowToCopy);
508 string colType = string.Format ("{0}", parameter.MetaParameter.TypeName);
509 if (colType == "nvarchar" || colType == "ntext" || colType == "nchar") {
510 if (row [mapping.SourceColumn] != null && row [mapping.SourceColumn] != DBNull.Value) {
511 size = ((string) rowToCopy).Length;
514 } else if (colType == "varchar" || colType == "text" || colType == "char") {
515 if (row [mapping.SourceColumn] != null && row [mapping.SourceColumn] != DBNull.Value)
516 size = ((string) rowToCopy).Length;
518 size = parameter.Size;
526 rowToCopy = row [param.ParameterName];
527 string colType = param.MetaParameter.TypeName;
529 If column type is SqlDbType.NVarChar the size of parameter is multiplied by 2
530 FIXME: Need to check for other types
532 if (colType == "nvarchar" || colType == "ntext" || colType == "nchar") {
533 size = ((string) row [param.ParameterName]).Length;
535 } else if (colType == "varchar" || colType == "text" || colType == "char") {
536 size = ((string) row [param.ParameterName]).Length;
541 if (rowToCopy == null)
544 blkCopy.BulkCopyData (rowToCopy, isNewRow, size, param.MetaParameter);
548 } // foreach (SqlParameter)
549 if (_notifyAfter > 0) {
551 if (noRowsCopied >= _notifyAfter) {
552 RowsCopied (noRowsCopied);
556 } // foreach (DataRow)
557 blkCopy.BulkCopyEnd ();
560 private bool IsTextType(SqlDbType sqlType)
562 return (sqlType == SqlDbType.NText ||
563 sqlType == SqlDbType.NVarChar ||
564 sqlType == SqlDbType.Text ||
565 sqlType == SqlDbType.VarChar ||
566 sqlType == SqlDbType.Char ||
567 sqlType == SqlDbType.NChar);
570 public void WriteToServer (DataRow [] rows)
573 throw new ArgumentNullException ("rows");
574 if (rows.Length == 0)
576 DataTable table = new DataTable (rows [0].Table.TableName);
577 foreach (DataColumn col in rows [0].Table.Columns) {
578 DataColumn tmpCol = new DataColumn (col.ColumnName, col.DataType);
579 table.Columns.Add (tmpCol);
581 foreach (DataRow row in rows) {
582 DataRow tmpRow = table.NewRow ();
583 for (int i = 0; i < table.Columns.Count; i++) {
584 tmpRow [i] = row [i];
586 table.Rows.Add (tmpRow);
588 BulkCopyToServer (table, 0);
591 public void WriteToServer (DataTable table)
593 BulkCopyToServer (table, 0);
596 public void WriteToServer (IDataReader reader)
598 DataTable table = new DataTable ("SourceTable");
599 SqlDataAdapter adapter = new SqlDataAdapter ();
600 adapter.FillInternal (table, reader);
601 BulkCopyToServer (table, 0);
604 public void WriteToServer (DataTable table, DataRowState rowState)
606 BulkCopyToServer (table, rowState);
610 public void WriteToServer (DbDataReader reader)
612 throw new NotImplementedException ();
616 public Task WriteToServerAsync (DbDataReader reader)
618 throw new NotImplementedException ();
622 public Task WriteToServerAsync (DbDataReader reader, CancellationToken cancellationToken)
624 throw new NotImplementedException ();
627 private void RowsCopied (long rowsCopied)
629 SqlRowsCopiedEventArgs e = new SqlRowsCopiedEventArgs (rowsCopied);
630 if (null != SqlRowsCopied) {
631 SqlRowsCopied (this, e);
639 public event SqlRowsCopiedEventHandler SqlRowsCopied;
643 void IDisposable.Dispose ()
645 //throw new NotImplementedException ();
646 if (isLocalConnection) {