I am working with the Data Flow Task Script Component for the first time. I have created a second Output. In my script I add rows to this output.
I have found that Ssis does not release those rows to the second Output until it has processed all of the incomine pipeline records. This will not work for me as there are going to be a few million records coming down the pipe, so I need the Script Component to as soon as possible release these records downstream for insert into the destination component Ole Db component.
Any help would be greatly appreciated?
Hmm... I could not replicate.How are you sending data to the 2nd output?|||
The only way I could figure to add a row to the dataset incode was to make the Synchronous property "None", to reference the Output Buffer, and then I could access the OutputBuffer.AddRow() method.
The problem is that then it tries to complete processing the entire pipeline before proceeding beyond the Script Component.
Is there a way to access the AddRow() method when the Output has the Input set as the SynchronousInput?
Here is my Script Component Code:
Code Snippet
Imports System
Imports System.Data
Imports System.Math
Imports System.Xml
Imports System.Windows.Forms
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Public Class ScriptMain
Inherits UserComponent
Private _Name As String
Private _Value As String
Private _Paco As String
Private _MessageId As Guid
Private _MessageStreamId As Guid
Private _LogDateTimeStamp As DateTime
Public Sub AddRow(ByVal message As String, ByVal row As Input1Buffer, ByVal xPathQuery As String, ByVal adduri As String, ByVal xPathPaco As String, ByVal removeuri As String)
If Not message Is Nothing Then
Dim doc As XmlDataDocument = New XmlDataDocument
Dim aXmlNode As XmlNode
doc.PreserveWhitespace = False
doc.LoadXml(message)
Dim namespaceManager As XmlNamespaceManager = New XmlNamespaceManager(doc.NameTable)
If Not removeuri Is Nothing Then namespaceManager.RemoveNamespace("ns0", removeuri)
namespaceManager.AddNamespace("ns0", adduri)
aXmlNode = doc.SelectSingleNode(xPathQuery, namespaceManager)
If Not aXmlNode Is Nothing Then
If aXmlNode.InnerText <> "0" Then
_Name = aXmlNode.Name
_Value = aXmlNode.InnerText
_MessageId = row.MessageID
_MessageStreamId = row.MessageStreamID
_LogDateTimeStamp = row.LogDateTimeStamp
aXmlNode = doc.SelectSingleNode(xPathPaco, namespaceManager)
If Not aXmlNode Is Nothing Then _Paco = aXmlNode.InnerText
CreateNewOutputRows()
End If
End If
End If
End Sub
Public Overrides Sub CreateNewOutputRows()
MyBase.CreateNewOutputRows()
With Output2Buffer
.AddRow()
.Name = _Name
.Value = _Value
.MessageId = _MessageId
.MessageStreamId = _MessageStreamId
.LogDateTimeStamp = _LogDateTimeStamp
.Paco = _Paco
End With
End Sub
Public Overrides Sub Input1_ProcessInputRow(ByVal row As Input1Buffer)
AddRow(row.StringMessageBody, row, row.xPathPrId, row.xPathPrNamespace, row.xPathPrPaco, Nothing)
AddRow(row.StringResponseMessageBody, row, row.xPathPrId, row.xPathPrNamespace, row.xPathPrPaco, Nothing)
AddRow(row.StringMessageBody, row, row.xPathPpId, row.xPathPpNamespace, row.xPathPpPaco, row.xPathPrNamespace)
AddRow(row.StringResponseMessageBody, row, row.xPathPpId, row.xPathPpNamespace, row.xPathPpPaco, Nothing)
AddRow(row.StringMessageBody, row, row.xPathConId, row.xPathConNamespace, row.xPathConPaco, row.xPathPpNamespace)
AddRow(row.StringResponseMessageBody, row, row.xPathConId, row.xPathConNamespace, row.xPathConPaco, Nothing)
AddRow(row.StringMessageBody, row, row.xPathEmailId, row.xPathEmailNamespace, row.xPathConPaco, row.xPathConNamespace)
AddRow(row.StringResponseMessageBody, row, row.xPathEmailId, row.xPathEmailNamespace, row.xPathConPaco, Nothing)
AddRow(row.StringMessageBody, row, row.xPathPhoneId, row.xPathPhoneNamespace, row.xPathConPaco, row.xPathEmailNamespace)
AddRow(row.StringResponseMessageBody, row, row.xPathPhoneId, row.xPathPhoneNamespace, row.xPathConPaco, Nothing)
AddRow(row.StringMessageBody, row, row.xPathAddrId, row.xPathAddrNamespace, row.xPathConPaco, row.xPathPhoneNamespace)
AddRow(row.StringResponseMessageBody, row, row.xPathAddrId, row.xPathAddrNamespace, row.xPathConPaco, Nothing)
AddRow(row.StringMessageBody, row, row.xPathPtId, row.xPathPtNamespace, row.xPathConPaco, row.xPathAddrNamespace)
AddRow(row.StringResponseMessageBody, row, row.xPathPtId, row.xPathPtNamespace, row.xPathConPaco, Nothing)
End Sub
End Class
|||You are correct that your component should be in asynchronous mode and you should be using AddRow. You should not, however, be calling CreateNewOutputRows the way you are. The engine will call that on its own. From what I can tell of you logic, you don't need to implement it. Just move the logic you currently have in CreateNewOutputRows to where you are calling it. The AddRow method can be called from anywhere in the script, not just CreateNewOutputRows. As it is, one of your rows will be duplicated when the engine makes its call.Regarding your concern about the script task wanting to complete the entire pipeline, you need to understand that rows do not move through the pipeline, only buffers do, and buffers consist of about 10,000 rows by default. Buffers are only ejected from a component when they are full, or the pipeline is empty. I see you have some logic filtering the incoming rows. I don't know how many incoming rows you have, or how stringent the filtering is, but if your output is less than a full buffer, then yes, it will wait until the source is finished.
|||Are you only wanting one row coming out of output 2? Or do you want a row for every input row?|||
Can I set the count of rows to cause the buffer to push the data downstream?
Each row that comes in contains an Xml document from which I am harvesting certain node values and sticking them into a structured table.
I originally did not use the CreateNewOutputRows. This was one attempt I made at trying to force the buffer data down through the pipeline. I am encountering the same results with or without the use of CreateNewOutputRows.
Again, my experience was that the script component was not pushing the output down the pipeline. My fear was/is that when I am processing millions of rows it will fill the Ssis Engine Memory space up before it pushes data down the pipeline.
Perhaps if I could set the count at which I want it to push data down the pipeline? Is that possible?
|||Calling AddRow() on the 2nd output in "Public Overrides Sub Input1_ProcessInputRow" should send rows to the 2nd output without waiting for the 1st output. That's what my tests yield.|||
Dotnet Fellow wrote:
Can I set the count of rows to cause the buffer to push the data downstream?
Each row that comes in contains an Xml document from which I am harvesting certain node values and sticking them into a structured table.
I originally did not use the CreateNewOutputRows. This was one attempt I made at trying to force the buffer data down through the pipeline. I am encountering the same results with or without the use of CreateNewOutputRows.
Again, my experience was that the script component was not pushing the output down the pipeline. My fear was/is that when I am processing millions of rows it will fill the Ssis Engine Memory space up before it pushes data down the pipeline.
Perhaps if I could set the count at which I want it to push data down the pipeline? Is that possible?
Yes, you can change the number of rows that go on a buffer. This will cause your buffers to fill and be ejected from your script earlier. I don't recommend it or think it is necessary, though. By default, buffers are 10,000 rows or 10 MB, whichever comes first. So memory should not be a concern. The DefaultMaxBufferRows property of the DataFlow controls the number of rows allowed on a buffer.
If you really expect to be running a million rows through this code, you may want to consider changing it so the XML documents are only loaded once for each row, instead of for each of the xpath nodes your are extracting. I count 14 document loads per row, when probably only 2 are necessary.
No comments:
Post a Comment