2 // System.Data.SqlClient.SqlBulkCopy.cs
5 // Nagappan A (anagappan@novell.com)
7 // (C) Novell, Inc 2007
10 // Copyright (C) 2007 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 using System.Data.Common;
37 using Mono.Data.Tds.Protocol;
39 namespace System.Data.SqlClient {
40 /// <summary>Efficient way to bulk load SQL Server table with several data rows at once</summary>
41 public sealed class SqlBulkCopy : IDisposable
44 private const string transConflictMessage = "Must not specify SqlBulkCopyOptions.UseInternalTransaction " +
45 "and pass an external Transaction at the same time.";
47 private const SqlBulkCopyOptions insertModifiers =
48 SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.TableLock |
49 SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.FireTriggers;
54 private int _batchSize = 0;
55 private int _notifyAfter = 0;
56 private int _bulkCopyTimeout = 0;
57 private SqlBulkCopyColumnMappingCollection _columnMappingCollection = new SqlBulkCopyColumnMappingCollection ();
58 private string _destinationTableName = null;
59 private bool ordinalMapping = false;
60 private bool sqlRowsCopied = false;
61 private bool isLocalConnection = false;
62 private SqlConnection connection;
63 private SqlTransaction externalTransaction;
64 private SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default;
69 public SqlBulkCopy (SqlConnection connection)
71 if (connection == null) {
72 throw new ArgumentNullException("connection");
75 this.connection = connection;
78 public SqlBulkCopy (string connectionString)
80 if (connectionString == null) {
81 throw new ArgumentNullException("connectionString");
84 this.connection = new SqlConnection (connectionString);
85 isLocalConnection = true;
89 public SqlBulkCopy (string connectionString, SqlBulkCopyOptions copyOptions)
91 if (connectionString == null) {
92 throw new ArgumentNullException ("connectionString");
95 this.connection = new SqlConnection (connectionString);
96 isLocalConnection = true;
98 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction)
99 throw new NotImplementedException ("We don't know how to process UseInternalTransaction option.");
101 this.copyOptions = copyOptions;
105 public SqlBulkCopy (SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
107 if (connection == null) {
108 throw new ArgumentNullException ("connection");
111 this.connection = connection;
112 this.copyOptions = copyOptions;
114 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction) {
115 if (externalTransaction != null)
116 throw new ArgumentException (transConflictMessage);
119 this.externalTransaction = externalTransaction;
121 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction)
122 throw new NotImplementedException ("We don't know how to process UseInternalTransaction option.");
124 this.copyOptions = copyOptions;
131 public int BatchSize {
132 get { return _batchSize; }
133 set { _batchSize = value; }
136 public int BulkCopyTimeout {
137 get { return _bulkCopyTimeout; }
138 set { _bulkCopyTimeout = value; }
141 public SqlBulkCopyColumnMappingCollection ColumnMappings {
142 get { return _columnMappingCollection; }
145 public string DestinationTableName {
146 get { return _destinationTableName; }
147 set { _destinationTableName = value; }
150 public int NotifyAfter {
151 get { return _notifyAfter; }
154 throw new ArgumentOutOfRangeException ("NotifyAfter should be greater than or equal to 0");
155 _notifyAfter = value;
165 if (sqlRowsCopied == true) {
166 throw new InvalidOperationException ("Close should not be called from SqlRowsCopied event");
168 if (connection == null || connection.State == ConnectionState.Closed) {
174 private DataTable [] GetColumnMetaData ()
176 DataTable [] columnMetaDataTables = new DataTable [2];
177 SqlCommand cmd = new SqlCommand ("select @@trancount; " +
178 "set fmtonly on select * from " +
179 DestinationTableName + " set fmtonly off;" +
180 "exec sp_tablecollations_90 '" +
181 DestinationTableName + "'",
183 SqlDataReader reader = cmd.ExecuteReader ();
184 int i = 0; // Skipping 1st result
187 columnMetaDataTables [i - 1] = reader.GetSchemaTable ();
189 SqlDataAdapter adapter = new SqlDataAdapter ();
190 adapter.MissingSchemaAction = MissingSchemaAction.AddWithKey;
191 columnMetaDataTables [i - 1] = new DataTable ();
192 adapter.FillInternal (columnMetaDataTables [i - 1], reader);
195 } while (reader.IsClosed == false && reader.NextResult());
197 return columnMetaDataTables;
200 private string GenerateColumnMetaData (SqlCommand tmpCmd, DataTable colMetaData, DataTable tableCollations)
203 string statement = "";
205 foreach (DataRow row in colMetaData.Rows) {
207 foreach (DataColumn col in colMetaData.Columns) { // FIXME: This line not required, remove later
209 if (_columnMappingCollection.Count > 0) {
210 if (ordinalMapping) {
211 foreach (SqlBulkCopyColumnMapping mapping
212 in _columnMappingCollection) {
213 if (mapping.DestinationOrdinal == i) {
219 foreach (SqlBulkCopyColumnMapping mapping
220 in _columnMappingCollection) {
221 if (mapping.DestinationColumn == (string) row ["ColumnName"]) {
230 if ((bool)row ["IsReadOnly"]) {
236 SqlParameter param = new SqlParameter ((string) row ["ColumnName"],
237 ((SqlDbType) row ["ProviderType"]));
239 if ((int)row ["ColumnSize"] != -1) {
240 param.Size = (int) row ["ColumnSize"];
242 tmpCmd.Parameters.Add (param);
248 bool insertSt = false;
249 foreach (DataRow row in colMetaData.Rows) {
250 if (_columnMappingCollection.Count > 0) {
253 foreach (SqlParameter param in tmpCmd.Parameters) {
254 if (ordinalMapping) {
255 foreach (SqlBulkCopyColumnMapping mapping
256 in _columnMappingCollection) {
257 if (mapping.DestinationOrdinal == i && param.Value == null) {
262 foreach (SqlBulkCopyColumnMapping mapping
263 in _columnMappingCollection) {
264 if (mapping.DestinationColumn == param.ParameterName &&
265 (string)row ["ColumnName"] == param.ParameterName) {
272 if (insertSt == true)
275 if (insertSt == false)
278 if ((bool)row ["IsReadOnly"]) {
281 string columnInfo = "";
282 if ((int)row ["ColumnSize"] != -1) {
283 columnInfo = string.Format ("{0}({1})",
284 (SqlDbType) row ["ProviderType"],
287 columnInfo = string.Format ("{0}", (SqlDbType) row ["ProviderType"]);
291 string columnName = (string) row ["ColumnName"];
292 statement += string.Format ("[{0}] {1}", columnName, columnInfo);
295 if (tableCollations != null) {
296 foreach (DataRow collationRow in tableCollations.Rows) {
297 if ((string)collationRow ["name"] == columnName) {
298 statement += string.Format (" COLLATE {0}", collationRow ["collation"]);
307 private void ValidateColumnMapping (DataTable table, DataTable tableCollations)
309 foreach (SqlBulkCopyColumnMapping _columnMapping in _columnMappingCollection) {
310 if (ordinalMapping == false &&
311 (_columnMapping.DestinationColumn == String.Empty ||
312 _columnMapping.SourceColumn == String.Empty))
313 throw new InvalidOperationException ("Mappings must be either all null or ordinal");
314 if (ordinalMapping &&
315 (_columnMapping.DestinationOrdinal == -1 ||
316 _columnMapping.SourceOrdinal == -1))
317 throw new InvalidOperationException ("Mappings must be either all null or ordinal");
319 if (ordinalMapping == false) {
320 foreach (DataRow row in tableCollations.Rows) {
321 if ((string)row ["name"] == _columnMapping.DestinationColumn) {
327 throw new InvalidOperationException ("ColumnMapping does not match");
329 foreach (DataColumn col in table.Columns) {
330 if (col.ColumnName == _columnMapping.SourceColumn) {
336 throw new InvalidOperationException ("ColumnName " +
337 _columnMapping.SourceColumn +
340 if (_columnMapping.DestinationOrdinal >= tableCollations.Rows.Count)
341 throw new InvalidOperationException ("ColumnMapping does not match");
346 private void BulkCopyToServer (DataTable table, DataRowState state)
348 if (connection == null || connection.State == ConnectionState.Closed)
349 throw new InvalidOperationException ("This method should not be called on a closed connection");
350 if (_destinationTableName == null)
351 throw new ArgumentNullException ("DestinationTableName");
352 if (isLocalConnection && connection.State != ConnectionState.Open)
355 if ((copyOptions & SqlBulkCopyOptions.KeepIdentity) == SqlBulkCopyOptions.KeepIdentity) {
356 SqlCommand cmd = new SqlCommand ("set identity_insert " +
357 table.TableName + " on",
359 cmd.ExecuteScalar ();
361 DataTable [] columnMetaDataTables = GetColumnMetaData ();
362 DataTable colMetaData = columnMetaDataTables [0];
363 DataTable tableCollations = columnMetaDataTables [1];
365 if (_columnMappingCollection.Count > 0) {
366 if (_columnMappingCollection [0].SourceOrdinal != -1)
367 ordinalMapping = true;
368 ValidateColumnMapping (table, tableCollations);
371 SqlCommand tmpCmd = new SqlCommand ();
372 TdsBulkCopy blkCopy = new TdsBulkCopy ((Tds)connection.Tds);
373 if (((Tds)connection.Tds).TdsVersion >= TdsVersion.tds70) {
374 string statement = "insert bulk " + DestinationTableName + " (";
375 statement += GenerateColumnMetaData (tmpCmd, colMetaData, tableCollations);
378 #region Check requested options and add corresponding modifiers to the statement
379 if ((copyOptions & insertModifiers) != SqlBulkCopyOptions.Default) {
380 statement += " WITH (";
381 bool commaRequired = false;
383 if ((copyOptions & SqlBulkCopyOptions.CheckConstraints) == SqlBulkCopyOptions.CheckConstraints) {
386 statement += "CHECK_CONSTRAINTS";
387 commaRequired = true;
390 if ((copyOptions & SqlBulkCopyOptions.TableLock) == SqlBulkCopyOptions.TableLock) {
393 statement += "TABLOCK";
394 commaRequired = true;
397 if ((copyOptions & SqlBulkCopyOptions.KeepNulls) == SqlBulkCopyOptions.KeepNulls) {
400 statement += "KEEP_NULLS";
401 commaRequired = true;
404 if ((copyOptions & SqlBulkCopyOptions.FireTriggers) == SqlBulkCopyOptions.FireTriggers) {
407 statement += "FIRE_TRIGGERS";
408 commaRequired = true;
413 #endregion Check requested options and add corresponding modifiers to the statement
415 blkCopy.SendColumnMetaData (statement);
417 blkCopy.BulkCopyStart (tmpCmd.Parameters.MetaParameters);
418 long noRowsCopied = 0;
419 foreach (DataRow row in table.Rows) {
420 if (row.RowState == DataRowState.Deleted)
421 continue; // Don't copy the row that's in deleted state
422 if (state != 0 && row.RowState != state)
424 bool isNewRow = true;
426 foreach (SqlParameter param in tmpCmd.Parameters) {
428 object rowToCopy = null;
429 if (_columnMappingCollection.Count > 0) {
430 if (ordinalMapping) {
431 foreach (SqlBulkCopyColumnMapping mapping
432 in _columnMappingCollection) {
433 if (mapping.DestinationOrdinal == i && param.Value == null) {
434 rowToCopy = row [mapping.SourceOrdinal];
435 SqlParameter parameter = new SqlParameter (mapping.SourceOrdinal.ToString (),
437 if (param.MetaParameter.TypeName != parameter.MetaParameter.TypeName) {
438 parameter.SqlDbType = param.SqlDbType;
439 rowToCopy = parameter.Value = parameter.ConvertToFrameworkType (rowToCopy);
441 string colType = string.Format ("{0}", parameter.MetaParameter.TypeName);
442 if (colType == "nvarchar") {
443 if (row [i] != null) {
444 size = ((string) parameter.Value).Length;
448 size = parameter.Size;
454 foreach (SqlBulkCopyColumnMapping mapping
455 in _columnMappingCollection) {
456 if (mapping.DestinationColumn == param.ParameterName) {
457 rowToCopy = row [mapping.SourceColumn];
458 SqlParameter parameter = new SqlParameter (mapping.SourceColumn, rowToCopy);
459 if (param.MetaParameter.TypeName != parameter.MetaParameter.TypeName) {
460 parameter.SqlDbType = param.SqlDbType;
461 rowToCopy = parameter.Value = parameter.ConvertToFrameworkType (rowToCopy);
463 string colType = string.Format ("{0}", parameter.MetaParameter.TypeName);
464 if (colType == "nvarchar") {
465 if (row [mapping.SourceColumn] != null) {
466 size = ((string) rowToCopy).Length;
470 size = parameter.Size;
478 rowToCopy = row [param.ParameterName];
479 string colType = param.MetaParameter.TypeName;
481 If column type is SqlDbType.NVarChar the size of parameter is multiplied by 2
482 FIXME: Need to check for other types
484 if (colType == "nvarchar") {
485 size = ((string) row [param.ParameterName]).Length;
491 if (rowToCopy == null)
493 blkCopy.BulkCopyData (rowToCopy, size, isNewRow);
496 } // foreach (SqlParameter)
497 if (_notifyAfter > 0) {
499 if (noRowsCopied >= _notifyAfter) {
500 RowsCopied (noRowsCopied);
504 } // foreach (DataRow)
505 blkCopy.BulkCopyEnd ();
508 public void WriteToServer (DataRow [] rows)
511 throw new ArgumentNullException ("rows");
512 if (rows.Length == 0)
514 DataTable table = new DataTable (rows [0].Table.TableName);
515 foreach (DataColumn col in rows [0].Table.Columns) {
516 DataColumn tmpCol = new DataColumn (col.ColumnName, col.DataType);
517 table.Columns.Add (tmpCol);
519 foreach (DataRow row in rows) {
520 DataRow tmpRow = table.NewRow ();
521 for (int i = 0; i < table.Columns.Count; i++) {
522 tmpRow [i] = row [i];
524 table.Rows.Add (tmpRow);
526 BulkCopyToServer (table, 0);
529 public void WriteToServer (DataTable table)
531 BulkCopyToServer (table, 0);
534 public void WriteToServer (IDataReader reader)
536 DataTable table = new DataTable ();
537 SqlDataAdapter adapter = new SqlDataAdapter ();
538 adapter.FillInternal (table, reader);
539 BulkCopyToServer (table, 0);
542 public void WriteToServer (DataTable table, DataRowState rowState)
544 BulkCopyToServer (table, rowState);
547 private void RowsCopied (long rowsCopied)
549 SqlRowsCopiedEventArgs e = new SqlRowsCopiedEventArgs (rowsCopied);
550 if (null != SqlRowsCopied) {
551 SqlRowsCopied (this, e);
559 public event SqlRowsCopiedEventHandler SqlRowsCopied;
563 void IDisposable.Dispose ()
565 //throw new NotImplementedException ();
566 if (isLocalConnection) {