当多个 KieBase 并行执行时,为什么 Drools 执行的规则不正确?

Why Drools is executing incorrect rules when multiple KieBase are executed in parallel?

当多个进程并行 运行 并且在每个进程中,每次都会创建和处理一个新的 KieBase 对象时,Drools 似乎会给出不正确的结果。

尝试过以下版本:6.5.0.Final7.32.0.Final

详情:

我并行执行了 120 个任务(使用 7 个线程)。在这 120 个任务中,drools 给出了 108 个任务的正确结果,但对 12 个任务执行了错误的规则(每个任务中失败的任务数量各不相同 运行)。

让我post这里的代码和输出:

    public class TempClass {
        public List<String> droolLogging = new ArrayList<>();
    }

   public void execute(){
        Map<String, List<String>> failedTasks = new ConcurrentHashMap<>(); // to see which tasks were incorrectly executed

        // Run 120 tasks in parallel using x threads (x depends upon no of processor)

        IntStream.range(1, 120).parallel()
        .forEach(taskCounter -> {

            String uniqueId = "Task-"+taskCounter;

            TempClass classObj = new TempClass();

            String ruleString = "package com.sample" + taskCounter + "\n" +
                    "import com.TempClass\n" +
                    "\n" +\
                    "rule \"droolLogging"+taskCounter+"\"\n" +
                    "\t when \n" +
                    "\t\t obj: TempClass(true)\n" +
                    "\t then \n" +
                    "\t\t obj.droolLogging.add(\"RuleOf-"+uniqueId+"\");\n" +
                    "\t end\n";

            // Above ruleString contains 1 rule and it is always executed. 
            // After execution, it will add an entry in array list 'droolLogging' 
            // of class 'TempClass'. In this entry, we are storing task counter 
            // to see rule of which task is executed.


            //following line of code seems to be the culprit as this is somehow returning incorrect KieBase sometime.

            KieBase kbase = new KieHelper()
                             .addContent(ruleString, ResourceType.DRL)
                             .build();

        /*
        //Same issue occurs even if I create different file with different name instead of using KieHelper.

        KieServices ks = KieServices.Factory.get();
        KieFileSystem kfs = ks.newKieFileSystem();

        String inMemoryDrlFileName = "src/main/resources/inmemoryrules-" + taskCounter + ".drl";

        kfs.write(inMemoryDrlFileName, ruleString);

        KieBuilder kieBuilder = ks.newKieBuilder(kfs).buildAll();
        KieContainer kContainer = ks.newKieContainer(kieBuilder.getKieModule().getReleaseId());
        KieBaseConfiguration kbconf = ks.newKieBaseConfiguration();
        KieBase kbase = kContainer.newKieBase(kbconf);
        */


            StatelessKieSession kieSession = kbase.newStatelessKieSession();
            kieSession.execute(classObj);

            System.out.println("(" + Thread.currentThread().getName() + ") " +
                    uniqueId + "_" + classObj.droolLogging );

            //Important: 
            //  To see if correct rule is executed, task no. printed by variable 'droolLogging'
            //  should match with uniqueId

            if(classObj.droolLogging == null || classObj.droolLogging.size() != 1 ||
                    !classObj.droolLogging.get(0).endsWith(uniqueId)) {
                failedTasks.put("" + taskCounter, classObj.droolLogging);
            }
        });

        logger.info("Failed:\n {}", failedTasks);
    }


OUTPUT:
    (ForkJoinPool.commonPool-worker-1) Task-37_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-6) Task-8_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-3) Task-18_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-2) Task-108_[RuleOf-Task-4]
    (main) Task-78_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-7) Task-52_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-4) Task-97_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-5) Task-4_[RuleOf-Task-4]
    (ForkJoinPool.commonPool-worker-3) Task-19_[RuleOf-Task-19]
    (ForkJoinPool.commonPool-worker-5) Task-5_[RuleOf-Task-5]
    (ForkJoinPool.commonPool-worker-2) Task-109_[RuleOf-Task-109]
    (ForkJoinPool.commonPool-worker-7) Task-53_[RuleOf-Task-53]
    (ForkJoinPool.commonPool-worker-1) Task-38_[RuleOf-Task-38]
    (ForkJoinPool.commonPool-worker-4) Task-98_[RuleOf-Task-98]
    .... more

    Failed (12):
        {88=[RuleOf-Task-77], 78=[RuleOf-Task-4], 68=[RuleOf-Task-60], 37=[RuleOf-Task-4], 15=[RuleOf-Task-1], 18=[RuleOf-Task-4], 7=[RuleOf-Task-11], 8=[RuleOf-Task-4], 108=[RuleOf-Task-4], 71=[RuleOf-Task-76], 52=[RuleOf-Task-4], 97=[RuleOf-Task-4]}

This shows:

 - Rule of task 77 was executed in task 88
 - Rule of task 4 was executed in task 78
 - Rule of task 60 was executed in task 68
 - ....

This is wrong. For correct results, in each process, Rule of task X should be executed in task X only.

知道这背后的原因是什么吗?


更新: 以上代码仅用于测试目的,以查看 KieBase 的生成和执行在多线程环境中的表现。实际用例如下:

用例:

我们有一套明智的规则。对于每个类别,需要执行一组特定的规则。

Example:

 for category 1 , I need to execute rule101, rule102, rule103

 for category 2 , I need to execute rule201, rule202, rule203
 ....

 Note: During evaluation, rules of category X should NOT interfere with Rules of category Y, i.e., they should be run independently.

因为没有。类别很大,我们正在并行构建 KieBases(针对每个类别)并将其存储 x 分钟。 x 分钟后,我们检查任何类别的规则是否已更改,如果更改,KieBase 将再次为这些类别编译(这将再次并行)。

此外,可以在 运行 时添加新类别。因此,对于新添加的类别,也遵循上述程序。

   category1 -> KieBase1   (compiled rules: rule101, rule102, rule103)
   category2 -> KieBase2   (compiled rules: rule201, rule202, rule203)
   category3 -> KieBase3

   Note: As already mentioned above, execution of KieBase X should NOT interfere with execution of KieBase Y as KieBases are created category wise and for each category, only particular set of rules should be executed.

最终证明是 Drools 中的一个错误,因为 KieHelper 在多线程环境中使用不安全..

在 Drools Dev Community 的一位成员提出解决方案后,以下似乎是上述异常的原因以及解决此问题的解决方法:

问题的根本原因:KieHelper 使用相同的默认 releaseId 构建 KieModule。

解决方案:为每个构建使用不同的版本 ID。

代码:(在上述代码中使用)

KieServices ks = KieServices.Factory.get();
KieFileSystem kfs = ks.newKieFileSystem();
kfs.write("src/main/resources/rules.drl", ruleString);

ReleaseId releaseId = ks.newReleaseId("com.rule", "test" + taskCounter, "1.0.0");
kfs.generateAndWritePomXML(releaseId);

KieBuilder kieBuilder = ks.newKieBuilder(kfs).buildAll();
Results results = kieBuilder.getResults();

if (results.hasMessages(Message.Level.ERROR)) {
  throw new RuntimeException(results.getMessages().toString());
}

KieContainer kContainer = ks.newKieContainer(releaseId);
KieBase kbase = kContainer.newKieBase(ks.newKieBaseConfiguration());

StatelessKieSession kieSession = kbase.newStatelessKieSession();
kieSession.execute(classObj);

已通过 运行 >500 个并行进程验证,未出现问题。