2 // System.Data.SqlClient.SqlBulkCopy.cs
5 // Nagappan A (anagappan@novell.com)
7 // (C) Novell, Inc 2007
10 // Copyright (C) 2007 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 using System.Data.Common;
37 using Mono.Data.Tds.Protocol;
39 namespace System.Data.SqlClient {
40 /// <summary>Efficient way to bulk load SQL Server table with several data rows at once</summary>
41 public sealed class SqlBulkCopy : IDisposable
44 private const string transConflictMessage = "Must not specify SqlBulkCopyOptions.UseInternalTransaction " +
45 "and pass an external Transaction at the same time.";
50 private int _batchSize = 0;
51 private int _notifyAfter = 0;
52 private int _bulkCopyTimeout = 0;
53 private SqlBulkCopyColumnMappingCollection _columnMappingCollection = new SqlBulkCopyColumnMappingCollection ();
54 private string _destinationTableName = null;
55 private bool ordinalMapping = false;
56 private bool sqlRowsCopied = false;
57 private bool identityInsert = false;
58 private bool isLocalConnection = false;
59 private SqlConnection connection;
60 private SqlTransaction externalTransaction;
61 private SqlBulkCopyOptions copyOptions = SqlBulkCopyOptions.Default;
66 public SqlBulkCopy (SqlConnection connection)
68 if (connection == null) {
69 throw new ArgumentNullException("connection");
72 this.connection = connection;
75 public SqlBulkCopy (string connectionString)
77 if (connectionString == null) {
78 throw new ArgumentNullException("connectionString");
81 this.connection = new SqlConnection (connectionString);
82 isLocalConnection = true;
86 public SqlBulkCopy (string connectionString, SqlBulkCopyOptions copyOptions)
88 if (connectionString == null) {
89 throw new ArgumentNullException ("connectionString");
92 this.connection = new SqlConnection (connectionString);
93 isLocalConnection = true;
95 switch (copyOptions) {
96 case SqlBulkCopyOptions.Default:
97 this.copyOptions = copyOptions;
101 throw new NotImplementedException ("We don't know how to process non-default copyOptions.");
106 public SqlBulkCopy (SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
108 if (connection == null) {
109 throw new ArgumentNullException ("connection");
112 this.connection = connection;
113 this.copyOptions = copyOptions;
115 if ((copyOptions & SqlBulkCopyOptions.UseInternalTransaction) == SqlBulkCopyOptions.UseInternalTransaction) {
116 if (externalTransaction != null)
117 throw new ArgumentException (transConflictMessage);
120 this.externalTransaction = externalTransaction;
122 switch (copyOptions) {
123 case SqlBulkCopyOptions.Default:
124 this.copyOptions = copyOptions;
128 throw new NotImplementedException ("We don't know how to process non-default copyOptions.");
136 public int BatchSize {
137 get { return _batchSize; }
138 set { _batchSize = value; }
141 public int BulkCopyTimeout {
142 get { return _bulkCopyTimeout; }
143 set { _bulkCopyTimeout = value; }
146 public SqlBulkCopyColumnMappingCollection ColumnMappings {
147 get { return _columnMappingCollection; }
150 public string DestinationTableName {
151 get { return _destinationTableName; }
152 set { _destinationTableName = value; }
155 public int NotifyAfter {
156 get { return _notifyAfter; }
159 throw new ArgumentOutOfRangeException ("NotifyAfter should be greater than or equal to 0");
160 _notifyAfter = value;
170 if (sqlRowsCopied == true) {
171 throw new InvalidOperationException ("Close should not be called from SqlRowsCopied event");
173 if (connection == null || connection.State == ConnectionState.Closed) {
179 private DataTable [] GetColumnMetaData ()
181 DataTable [] columnMetaDataTables = new DataTable [2];
182 SqlCommand cmd = new SqlCommand ("select @@trancount; " +
183 "set fmtonly on select * from " +
184 DestinationTableName + " set fmtonly off;" +
185 "exec sp_tablecollations_90 '" +
186 DestinationTableName + "'",
188 SqlDataReader reader = cmd.ExecuteReader ();
189 int i = 0; // Skipping 1st result
192 columnMetaDataTables [i - 1] = reader.GetSchemaTable ();
194 SqlDataAdapter adapter = new SqlDataAdapter ();
195 adapter.MissingSchemaAction = MissingSchemaAction.AddWithKey;
196 columnMetaDataTables [i - 1] = new DataTable ();
197 adapter.FillInternal (columnMetaDataTables [i - 1], reader);
200 } while (reader.IsClosed == false && reader.NextResult());
202 return columnMetaDataTables;
205 private string GenerateColumnMetaData (SqlCommand tmpCmd, DataTable colMetaData, DataTable tableCollations)
208 string statement = "";
210 foreach (DataRow row in colMetaData.Rows) {
212 foreach (DataColumn col in colMetaData.Columns) { // FIXME: This line not required, remove later
214 if (_columnMappingCollection.Count > 0) {
215 if (ordinalMapping) {
216 foreach (SqlBulkCopyColumnMapping mapping
217 in _columnMappingCollection) {
218 if (mapping.DestinationOrdinal == i) {
224 foreach (SqlBulkCopyColumnMapping mapping
225 in _columnMappingCollection) {
226 if (mapping.DestinationColumn == (string) row ["ColumnName"]) {
235 if ((bool)row ["IsReadOnly"]) {
241 SqlParameter param = new SqlParameter ((string) row ["ColumnName"],
242 ((SqlDbType) row ["ProviderType"]));
244 if ((int)row ["ColumnSize"] != -1) {
245 param.Size = (int) row ["ColumnSize"];
247 tmpCmd.Parameters.Add (param);
253 bool insertSt = false;
254 foreach (DataRow row in colMetaData.Rows) {
255 if (_columnMappingCollection.Count > 0) {
258 foreach (SqlParameter param in tmpCmd.Parameters) {
259 if (ordinalMapping) {
260 foreach (SqlBulkCopyColumnMapping mapping
261 in _columnMappingCollection) {
262 if (mapping.DestinationOrdinal == i && param.Value == null) {
267 foreach (SqlBulkCopyColumnMapping mapping
268 in _columnMappingCollection) {
269 if (mapping.DestinationColumn == param.ParameterName &&
270 (string)row ["ColumnName"] == param.ParameterName) {
277 if (insertSt == true)
280 if (insertSt == false)
283 if ((bool)row ["IsReadOnly"]) {
286 string columnInfo = "";
287 if ((int)row ["ColumnSize"] != -1) {
288 columnInfo = string.Format ("{0}({1})",
289 (SqlDbType) row ["ProviderType"],
292 columnInfo = string.Format ("{0}", (SqlDbType) row ["ProviderType"]);
296 string columnName = (string) row ["ColumnName"];
297 statement += string.Format ("[{0}] {1}", columnName, columnInfo);
300 if (tableCollations != null) {
301 foreach (DataRow collationRow in tableCollations.Rows) {
302 if ((string)collationRow ["name"] == columnName) {
303 statement += string.Format (" COLLATE {0}", collationRow ["collation"]);
312 private void ValidateColumnMapping (DataTable table, DataTable tableCollations)
314 foreach (SqlBulkCopyColumnMapping _columnMapping in _columnMappingCollection) {
315 if (ordinalMapping == false &&
316 (_columnMapping.DestinationColumn == String.Empty ||
317 _columnMapping.SourceColumn == String.Empty))
318 throw new InvalidOperationException ("Mappings must be either all null or ordinal");
319 if (ordinalMapping &&
320 (_columnMapping.DestinationOrdinal == -1 ||
321 _columnMapping.SourceOrdinal == -1))
322 throw new InvalidOperationException ("Mappings must be either all null or ordinal");
324 if (ordinalMapping == false) {
325 foreach (DataRow row in tableCollations.Rows) {
326 if ((string)row ["name"] == _columnMapping.DestinationColumn) {
332 throw new InvalidOperationException ("ColumnMapping does not match");
334 foreach (DataColumn col in table.Columns) {
335 if (col.ColumnName == _columnMapping.SourceColumn) {
341 throw new InvalidOperationException ("ColumnName " +
342 _columnMapping.SourceColumn +
345 if (_columnMapping.DestinationOrdinal >= tableCollations.Rows.Count)
346 throw new InvalidOperationException ("ColumnMapping does not match");
351 private void BulkCopyToServer (DataTable table, DataRowState state)
353 if (connection == null || connection.State == ConnectionState.Closed)
354 throw new InvalidOperationException ("This method should not be called on a closed connection");
355 if (_destinationTableName == null)
356 throw new ArgumentNullException ("DestinationTableName");
357 if (identityInsert) {
358 SqlCommand cmd = new SqlCommand ("set identity_insert " +
359 table.TableName + " on",
361 cmd.ExecuteScalar ();
363 DataTable [] columnMetaDataTables = GetColumnMetaData ();
364 DataTable colMetaData = columnMetaDataTables [0];
365 DataTable tableCollations = columnMetaDataTables [1];
367 if (_columnMappingCollection.Count > 0) {
368 if (_columnMappingCollection [0].SourceOrdinal != -1)
369 ordinalMapping = true;
370 ValidateColumnMapping (table, tableCollations);
373 SqlCommand tmpCmd = new SqlCommand ();
374 TdsBulkCopy blkCopy = new TdsBulkCopy ((Tds)connection.Tds);
375 if (((Tds)connection.Tds).TdsVersion >= TdsVersion.tds70) {
376 string statement = "insert bulk " + DestinationTableName + " (";
377 statement += GenerateColumnMetaData (tmpCmd, colMetaData, tableCollations);
379 blkCopy.SendColumnMetaData (statement);
381 blkCopy.BulkCopyStart (tmpCmd.Parameters.MetaParameters);
382 long noRowsCopied = 0;
383 foreach (DataRow row in table.Rows) {
384 if (row.RowState == DataRowState.Deleted)
385 continue; // Don't copy the row that's in deleted state
386 if (state != 0 && row.RowState != state)
388 bool isNewRow = true;
390 foreach (SqlParameter param in tmpCmd.Parameters) {
392 object rowToCopy = null;
393 if (_columnMappingCollection.Count > 0) {
394 if (ordinalMapping) {
395 foreach (SqlBulkCopyColumnMapping mapping
396 in _columnMappingCollection) {
397 if (mapping.DestinationOrdinal == i && param.Value == null) {
398 rowToCopy = row [mapping.SourceOrdinal];
399 SqlParameter parameter = new SqlParameter (mapping.SourceOrdinal.ToString (),
401 if (param.MetaParameter.TypeName != parameter.MetaParameter.TypeName) {
402 parameter.SqlDbType = param.SqlDbType;
403 rowToCopy = parameter.Value = parameter.ConvertToFrameworkType (rowToCopy);
405 string colType = string.Format ("{0}", parameter.MetaParameter.TypeName);
406 if (colType == "nvarchar") {
407 if (row [i] != null) {
408 size = ((string) parameter.Value).Length;
412 size = parameter.Size;
418 foreach (SqlBulkCopyColumnMapping mapping
419 in _columnMappingCollection) {
420 if (mapping.DestinationColumn == param.ParameterName) {
421 rowToCopy = row [mapping.SourceColumn];
422 SqlParameter parameter = new SqlParameter (mapping.SourceColumn, rowToCopy);
423 if (param.MetaParameter.TypeName != parameter.MetaParameter.TypeName) {
424 parameter.SqlDbType = param.SqlDbType;
425 rowToCopy = parameter.Value = parameter.ConvertToFrameworkType (rowToCopy);
427 string colType = string.Format ("{0}", parameter.MetaParameter.TypeName);
428 if (colType == "nvarchar") {
429 if (row [mapping.SourceColumn] != null) {
430 size = ((string) rowToCopy).Length;
434 size = parameter.Size;
442 rowToCopy = row [param.ParameterName];
443 string colType = param.MetaParameter.TypeName;
445 If column type is SqlDbType.NVarChar the size of parameter is multiplied by 2
446 FIXME: Need to check for other types
448 if (colType == "nvarchar") {
449 size = ((string) row [param.ParameterName]).Length;
455 if (rowToCopy == null)
457 blkCopy.BulkCopyData (rowToCopy, size, isNewRow);
460 } // foreach (SqlParameter)
461 if (_notifyAfter > 0) {
463 if (noRowsCopied >= _notifyAfter) {
464 RowsCopied (noRowsCopied);
468 } // foreach (DataRow)
469 blkCopy.BulkCopyEnd ();
472 public void WriteToServer (DataRow [] rows)
475 throw new ArgumentNullException ("rows");
476 DataTable table = new DataTable (rows [0].Table.TableName);
477 foreach (DataColumn col in rows [0].Table.Columns) {
478 DataColumn tmpCol = new DataColumn (col.ColumnName, col.DataType);
479 table.Columns.Add (tmpCol);
481 foreach (DataRow row in rows) {
482 DataRow tmpRow = table.NewRow ();
483 for (int i = 0; i < table.Columns.Count; i++) {
484 tmpRow [i] = row [i];
486 table.Rows.Add (tmpRow);
488 BulkCopyToServer (table, 0);
491 public void WriteToServer (DataTable table)
493 BulkCopyToServer (table, 0);
496 public void WriteToServer (IDataReader reader)
498 DataTable table = new DataTable ();
499 SqlDataAdapter adapter = new SqlDataAdapter ();
500 adapter.FillInternal (table, reader);
501 BulkCopyToServer (table, 0);
504 public void WriteToServer (DataTable table, DataRowState rowState)
506 BulkCopyToServer (table, rowState);
509 private void RowsCopied (long rowsCopied)
511 SqlRowsCopiedEventArgs e = new SqlRowsCopiedEventArgs (rowsCopied);
512 if (null != SqlRowsCopied) {
513 SqlRowsCopied (this, e);
521 public event SqlRowsCopiedEventHandler SqlRowsCopied;
525 void IDisposable.Dispose ()
527 //throw new NotImplementedException ();
528 if (isLocalConnection) {