WSO2 EI 6.2 中的并行处理
Paralel processing in WSO2 EI 6.2
我正在开发需要调用多个端点的健康检查 API。这个想法是,如果这些 EP 中的任何一个失败,我必须捕获错误并将其合并到响应中。我想创建一个调用其他叶子序列的主序列,每个叶子序列都有自己的故障序列。
这个图应该让事情更清楚:
seqA (faultSsqA)
/
mainSeq - seqB (faultSeqB)
\ seqC (faultSeqC)
如果所有序列处理成功,一切顺利,我能够处理响应,但是,如果叶序列之一出现错误,则执行相应的错误序列,但我不知道如何 return 将执行流程返回到主序列。我想要的是,如果序列B发生异常,则执行故障序列B,然后执行流程回到主序列,再回到序列C。
如果我可以并行执行所有叶序列,并且最后只收集所有结果,那就更好了。我知道迭代调解器会这样做,但据我了解,语义与我想要的不一样。
有人对此有想法吗?
编辑:看起来克隆调解器就是我要找的东西,但是我仍然无法让它按照我需要的方式工作。由于我将连接到不同的平台,因此响应格式是不同的。此外,我需要处理超时、500 响应代码等情况。因此,我不能简单地从每个目标调用一个端点 - 对于每个后端平台,我创建了一个执行所有这些逻辑的序列,并填充一些属性,以便我可以填充最终响应("platform A = up",等等)。
我的期望是所有目标序列都将并行处理
这是我的主序列。我使用克隆中介并将其定位到 2 个不同的序列,它们执行 EP 的实际调用并处理响应(我现在只使用 2 个,但将来会更多)。序列还会更新一些变量以反映后端平台的状态("platform A = up",等等)。
<sequence name="inSequence_healthCheck" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<clone id="healthCHeck">
<target>
<sequence>
<sequence key="healthCheck_wacs"/>
</sequence>
</target>
<target>
<sequence>
<sequence key="healthCheck_thesys"/>
</sequence>
</target>
</clone>
<loopback/>
</sequence>
这是目标序列之一(现在很简单,但里面应该有更多的逻辑)。请注意,我根据响应填充 属性 wacsStatus。
<sequence name="healthCheck_wacs" onError="faultSequence_healthCheck_wacs" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<call>
<endpoint key="gov:ClientEquipments/endpoints/WACS/wacs_healthCheck.endpoint.xml"/>
</call>
<switch source="$axis2:HTTP_SC">
<case regex="2\d\d|4\d\d">
<property name="wacsStatus" scope="default" type="BOOLEAN" value="true"/>
</case>
<default>
<property name="wacsStatus" scope="default" type="BOOLEAN" value="false"/>
</default>
</switch>
</sequence>
这是我的 outSequence,用于聚合:
<sequence name="outSequence_healthCheck" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log>
<property name="step" value="START outSequence_healthCheck"/>
<property expression="$ctx:wacsStatus" name="wacsStatus"/>
<property expression="$ctx:thesysStatus" name="thesysStatus"/>
<property expression="$ctx:phStatus" name="phStatus"/>
<property expression="$ctx:naStatus" name="naStatus"/>
</log>
<property name="info" scope="default">
<ns:Information xmlns:ns="http://wso2.com"/>
</property>
<aggregate id="healthCHeck">
<completeCondition>
<messageCount max="-1" min="2"/>
</completeCondition>
<onComplete enclosingElementProperty="info" expression="s11:Body/child::* | s12:Body/child::*"
xmlns:m0="http://services.samples" xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:s12="http://www.w3.org/2003/05/soap-envelope">
<log level="full"/>
</onComplete>
</aggregate>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<payloadFactory media-type="json">
<format>
{
"WACS" : "",
"thesys" : ""
}
</format>
<args>
<arg evaluator="xml" expression="$ctx:wacsStatus"/>
<arg evaluator="xml" expression="$ctx:thesysStatus"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<property name="HTTP_SC" scope="axis2" type="STRING" value="200"/>
<respond/>
</sequence>
我的期望是聚合中介将等待所有目标序列执行,到那时所有属性将被正确填充并且我可以使用 payloadFactory 正确格式化响应。但是,从日志中我可以看到,即使聚合确实聚合了来自目标序列的所有响应,属性也没有正确填充,因此响应不正确。
谢谢。
佩德罗
您应该尝试迭代 mediator/clone 和聚合中介(分散-聚集模式)构造。迭代允许您并行执行多个序列,而聚合将收集响应。您可以让单独的序列处理故障处理和 return 预期的响应或某些故障。然后您可以检查聚合结果是否有任何错误。
以下是一些示例:
Yenlo Blog
WSO2 Architecture Team
我正在开发需要调用多个端点的健康检查 API。这个想法是,如果这些 EP 中的任何一个失败,我必须捕获错误并将其合并到响应中。我想创建一个调用其他叶子序列的主序列,每个叶子序列都有自己的故障序列。 这个图应该让事情更清楚:
seqA (faultSsqA)
/
mainSeq - seqB (faultSeqB)
\ seqC (faultSeqC)
如果所有序列处理成功,一切顺利,我能够处理响应,但是,如果叶序列之一出现错误,则执行相应的错误序列,但我不知道如何 return 将执行流程返回到主序列。我想要的是,如果序列B发生异常,则执行故障序列B,然后执行流程回到主序列,再回到序列C。 如果我可以并行执行所有叶序列,并且最后只收集所有结果,那就更好了。我知道迭代调解器会这样做,但据我了解,语义与我想要的不一样。 有人对此有想法吗?
编辑:看起来克隆调解器就是我要找的东西,但是我仍然无法让它按照我需要的方式工作。由于我将连接到不同的平台,因此响应格式是不同的。此外,我需要处理超时、500 响应代码等情况。因此,我不能简单地从每个目标调用一个端点 - 对于每个后端平台,我创建了一个执行所有这些逻辑的序列,并填充一些属性,以便我可以填充最终响应("platform A = up",等等)。 我的期望是所有目标序列都将并行处理
这是我的主序列。我使用克隆中介并将其定位到 2 个不同的序列,它们执行 EP 的实际调用并处理响应(我现在只使用 2 个,但将来会更多)。序列还会更新一些变量以反映后端平台的状态("platform A = up",等等)。
<sequence name="inSequence_healthCheck" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<clone id="healthCHeck">
<target>
<sequence>
<sequence key="healthCheck_wacs"/>
</sequence>
</target>
<target>
<sequence>
<sequence key="healthCheck_thesys"/>
</sequence>
</target>
</clone>
<loopback/>
</sequence>
这是目标序列之一(现在很简单,但里面应该有更多的逻辑)。请注意,我根据响应填充 属性 wacsStatus。
<sequence name="healthCheck_wacs" onError="faultSequence_healthCheck_wacs" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<call>
<endpoint key="gov:ClientEquipments/endpoints/WACS/wacs_healthCheck.endpoint.xml"/>
</call>
<switch source="$axis2:HTTP_SC">
<case regex="2\d\d|4\d\d">
<property name="wacsStatus" scope="default" type="BOOLEAN" value="true"/>
</case>
<default>
<property name="wacsStatus" scope="default" type="BOOLEAN" value="false"/>
</default>
</switch>
</sequence>
这是我的 outSequence,用于聚合:
<sequence name="outSequence_healthCheck" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log>
<property name="step" value="START outSequence_healthCheck"/>
<property expression="$ctx:wacsStatus" name="wacsStatus"/>
<property expression="$ctx:thesysStatus" name="thesysStatus"/>
<property expression="$ctx:phStatus" name="phStatus"/>
<property expression="$ctx:naStatus" name="naStatus"/>
</log>
<property name="info" scope="default">
<ns:Information xmlns:ns="http://wso2.com"/>
</property>
<aggregate id="healthCHeck">
<completeCondition>
<messageCount max="-1" min="2"/>
</completeCondition>
<onComplete enclosingElementProperty="info" expression="s11:Body/child::* | s12:Body/child::*"
xmlns:m0="http://services.samples" xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:s12="http://www.w3.org/2003/05/soap-envelope">
<log level="full"/>
</onComplete>
</aggregate>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<payloadFactory media-type="json">
<format>
{
"WACS" : "",
"thesys" : ""
}
</format>
<args>
<arg evaluator="xml" expression="$ctx:wacsStatus"/>
<arg evaluator="xml" expression="$ctx:thesysStatus"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<property name="HTTP_SC" scope="axis2" type="STRING" value="200"/>
<respond/>
</sequence>
我的期望是聚合中介将等待所有目标序列执行,到那时所有属性将被正确填充并且我可以使用 payloadFactory 正确格式化响应。但是,从日志中我可以看到,即使聚合确实聚合了来自目标序列的所有响应,属性也没有正确填充,因此响应不正确。
谢谢。
佩德罗
您应该尝试迭代 mediator/clone 和聚合中介(分散-聚集模式)构造。迭代允许您并行执行多个序列,而聚合将收集响应。您可以让单独的序列处理故障处理和 return 预期的响应或某些故障。然后您可以检查聚合结果是否有任何错误。
以下是一些示例: Yenlo Blog WSO2 Architecture Team