Monday, March 12, 2012

How make script component output 2 asynchronous?

I am working with the Data Flow Task Script Component for the first time. I have created a second Output. In my script I add rows to this output.

I have found that Ssis does not release those rows to the second Output until it has processed all of the incomine pipeline records. This will not work for me as there are going to be a few million records coming down the pipe, so I need the Script Component to as soon as possible release these records downstream for insert into the destination component Ole Db component.

Any help would be greatly appreciated?

Hmm... I could not replicate.

How are you sending data to the 2nd output?|||

The only way I could figure to add a row to the dataset incode was to make the Synchronous property "None", to reference the Output Buffer, and then I could access the OutputBuffer.AddRow() method.

The problem is that then it tries to complete processing the entire pipeline before proceeding beyond the Script Component.

Is there a way to access the AddRow() method when the Output has the Input set as the SynchronousInput?

Here is my Script Component Code:

Code Snippet

Imports System

Imports System.Data

Imports System.Math

Imports System.Xml

Imports System.Windows.Forms

Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper

Imports Microsoft.SqlServer.Dts.Runtime.Wrapper

Public Class ScriptMain

Inherits UserComponent

Private _Name As String

Private _Value As String

Private _Paco As String

Private _MessageId As Guid

Private _MessageStreamId As Guid

Private _LogDateTimeStamp As DateTime

Public Sub AddRow(ByVal message As String, ByVal row As Input1Buffer, ByVal xPathQuery As String, ByVal adduri As String, ByVal xPathPaco As String, ByVal removeuri As String)

If Not message Is Nothing Then

Dim doc As XmlDataDocument = New XmlDataDocument

Dim aXmlNode As XmlNode

doc.PreserveWhitespace = False

doc.LoadXml(message)

Dim namespaceManager As XmlNamespaceManager = New XmlNamespaceManager(doc.NameTable)

If Not removeuri Is Nothing Then namespaceManager.RemoveNamespace("ns0", removeuri)

namespaceManager.AddNamespace("ns0", adduri)

aXmlNode = doc.SelectSingleNode(xPathQuery, namespaceManager)

If Not aXmlNode Is Nothing Then

If aXmlNode.InnerText <> "0" Then

_Name = aXmlNode.Name

_Value = aXmlNode.InnerText

_MessageId = row.MessageID

_MessageStreamId = row.MessageStreamID

_LogDateTimeStamp = row.LogDateTimeStamp

aXmlNode = doc.SelectSingleNode(xPathPaco, namespaceManager)

If Not aXmlNode Is Nothing Then _Paco = aXmlNode.InnerText

CreateNewOutputRows()

End If

End If

End If

End Sub

Public Overrides Sub CreateNewOutputRows()

MyBase.CreateNewOutputRows()

With Output2Buffer

.AddRow()

.Name = _Name

.Value = _Value

.MessageId = _MessageId

.MessageStreamId = _MessageStreamId

.LogDateTimeStamp = _LogDateTimeStamp

.Paco = _Paco

End With

End Sub

Public Overrides Sub Input1_ProcessInputRow(ByVal row As Input1Buffer)

AddRow(row.StringMessageBody, row, row.xPathPrId, row.xPathPrNamespace, row.xPathPrPaco, Nothing)

AddRow(row.StringResponseMessageBody, row, row.xPathPrId, row.xPathPrNamespace, row.xPathPrPaco, Nothing)

AddRow(row.StringMessageBody, row, row.xPathPpId, row.xPathPpNamespace, row.xPathPpPaco, row.xPathPrNamespace)

AddRow(row.StringResponseMessageBody, row, row.xPathPpId, row.xPathPpNamespace, row.xPathPpPaco, Nothing)

AddRow(row.StringMessageBody, row, row.xPathConId, row.xPathConNamespace, row.xPathConPaco, row.xPathPpNamespace)

AddRow(row.StringResponseMessageBody, row, row.xPathConId, row.xPathConNamespace, row.xPathConPaco, Nothing)

AddRow(row.StringMessageBody, row, row.xPathEmailId, row.xPathEmailNamespace, row.xPathConPaco, row.xPathConNamespace)

AddRow(row.StringResponseMessageBody, row, row.xPathEmailId, row.xPathEmailNamespace, row.xPathConPaco, Nothing)

AddRow(row.StringMessageBody, row, row.xPathPhoneId, row.xPathPhoneNamespace, row.xPathConPaco, row.xPathEmailNamespace)

AddRow(row.StringResponseMessageBody, row, row.xPathPhoneId, row.xPathPhoneNamespace, row.xPathConPaco, Nothing)

AddRow(row.StringMessageBody, row, row.xPathAddrId, row.xPathAddrNamespace, row.xPathConPaco, row.xPathPhoneNamespace)

AddRow(row.StringResponseMessageBody, row, row.xPathAddrId, row.xPathAddrNamespace, row.xPathConPaco, Nothing)

AddRow(row.StringMessageBody, row, row.xPathPtId, row.xPathPtNamespace, row.xPathConPaco, row.xPathAddrNamespace)

AddRow(row.StringResponseMessageBody, row, row.xPathPtId, row.xPathPtNamespace, row.xPathConPaco, Nothing)

End Sub

End Class

|||You are correct that your component should be in asynchronous mode and you should be using AddRow. You should not, however, be calling CreateNewOutputRows the way you are. The engine will call that on its own. From what I can tell of you logic, you don't need to implement it. Just move the logic you currently have in CreateNewOutputRows to where you are calling it. The AddRow method can be called from anywhere in the script, not just CreateNewOutputRows. As it is, one of your rows will be duplicated when the engine makes its call.

Regarding your concern about the script task wanting to complete the entire pipeline, you need to understand that rows do not move through the pipeline, only buffers do, and buffers consist of about 10,000 rows by default. Buffers are only ejected from a component when they are full, or the pipeline is empty. I see you have some logic filtering the incoming rows. I don't know how many incoming rows you have, or how stringent the filtering is, but if your output is less than a full buffer, then yes, it will wait until the source is finished.
|||Are you only wanting one row coming out of output 2? Or do you want a row for every input row?|||

Can I set the count of rows to cause the buffer to push the data downstream?

Each row that comes in contains an Xml document from which I am harvesting certain node values and sticking them into a structured table.

I originally did not use the CreateNewOutputRows. This was one attempt I made at trying to force the buffer data down through the pipeline. I am encountering the same results with or without the use of CreateNewOutputRows.

Again, my experience was that the script component was not pushing the output down the pipeline. My fear was/is that when I am processing millions of rows it will fill the Ssis Engine Memory space up before it pushes data down the pipeline.

Perhaps if I could set the count at which I want it to push data down the pipeline? Is that possible?

|||Calling AddRow() on the 2nd output in "Public Overrides Sub Input1_ProcessInputRow" should send rows to the 2nd output without waiting for the 1st output. That's what my tests yield.|||

Dotnet Fellow wrote:

Can I set the count of rows to cause the buffer to push the data downstream?

Each row that comes in contains an Xml document from which I am harvesting certain node values and sticking them into a structured table.

I originally did not use the CreateNewOutputRows. This was one attempt I made at trying to force the buffer data down through the pipeline. I am encountering the same results with or without the use of CreateNewOutputRows.

Again, my experience was that the script component was not pushing the output down the pipeline. My fear was/is that when I am processing millions of rows it will fill the Ssis Engine Memory space up before it pushes data down the pipeline.

Perhaps if I could set the count at which I want it to push data down the pipeline? Is that possible?

Yes, you can change the number of rows that go on a buffer. This will cause your buffers to fill and be ejected from your script earlier. I don't recommend it or think it is necessary, though. By default, buffers are 10,000 rows or 10 MB, whichever comes first. So memory should not be a concern. The DefaultMaxBufferRows property of the DataFlow controls the number of rows allowed on a buffer.

If you really expect to be running a million rows through this code, you may want to consider changing it so the XML documents are only loaded once for each row, instead of for each of the xpath nodes your are extracting. I count 14 document loads per row, when probably only 2 are necessary.

No comments:

Post a Comment