Akka 流 Grafstage

Akka Streams GraphStage

在 Akka Streams 中推荐使用 GraphStage,但我找不到任何关于在 Java 中使用 getStageActor() 方法的文档(我找到的所有文档都使用 Scala ).

如何将以下代码转换为 Java?

lazy val self: StageActor = getStageActor(onMessage)

private def onMessage(x: (ActorRef, Any)): Unit =
{
  x match {
    case (_, msg: String) =>
      log.info("received msg, queueing: {} ", msg)
      messages = messages.enqueue(msg)
      pump()
  }
}

我以前没有使用过getStageActor(),因此无法提供太多帮助。 Scala到Java的代码转换,一般情况下,如果需要的话,打包一个jar,包含Scala版本的类,供Java应用程序使用;否则请考虑使用 Java 反编译器(例如 cfr、procyon)反编译 Scala 编译的 类 并根据需要优化反编译的 Java 代码。

例如,反编译以下虚拟 Scala 代码将有助于揭示 lazy valpattern matching 的骨架 Java 方式:

class Foo {
  lazy val self = dummy(bar(_: String))
  def dummy(u: Unit) = 1
  private def bar(x: String): Unit = {
    x match {
      case "blah" => println(s"x = $x")
    }
  }
}

如下面的反编译代码所示,lazy val 是在 Java 中完成的,其值包含在带有 bitmap[=17=] 布尔标志的 synchronized 块中,并且 pattern matching 转换为 ifMatchError 异常:

$ java -jar /path/to/decompiler/cfr_0_125.jar Foo.class

private int self;
private volatile boolean bitmap[=11=];

private int self$lzycompute() {
    Foo foo = this;
    synchronized (foo) {
        if (!this.bitmap[=11=]) {
            new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ Foo $outer;

                public final void apply(String x) {
                    this.$outer.Foo$$bar(x);
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            };
            this.self = this.dummy(BoxedUnit.UNIT);
            this.bitmap[=11=] = true;
        }
        return this.self;
    }
}

public int self() {
    return this.bitmap[=11=] ? this.self : this.self$lzycompute();
}

public int dummy(BoxedUnit u) {
    return 1;
}

public void Foo$$bar(String x) {
    String string = x;
    if ("blah".equals(string)) {
        Predef$.MODULE$.println((Object)new StringContext(
                (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"x = ", ""})
            ).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{x}))
        );
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        return;
    }
    throw new MatchError((Object)string);
}

根据getStageActor method documentation,它接受类型

的值
scala.Function1<scala.Tuple2<ActorRef,java.lang.Object>, scala.runtime.BoxedUnit>

在 Scala 中看起来像

((ActorRef, AnyRef)) => Unit

在 Java 中,此类型在语义上等同于(使用 Function 接口)

Function<Tuple<ActorRef, Object>, Void>

其中 Tuple<A, B> 是一个 class,它包含两个类型为 AB 的值。

因此,要调用getStageActor方法,您需要创建一个上述类型的值。您可以通过构造 class 扩展 AbstractFunction1:

的实例来直接完成
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

getStateActor(new AbstractFunction1<Tuple2<ActorRef, Object>, BoxedUnit>() {
    @Override
    public BoxedUnit apply(Tuple2<ActorRef, Object> args) {
        return BoxedUnit.UNIT;
    }
});

如果您使用 Java 8,可以使用 lambda 表达式在语法上更好地实现。

如果你使用Scala 2.12+,那么scala.Function1是一个函数式接口,你可以直接使用lambda表达式:

getStateActor((args: Tuple2<ActorRef, Object>) -> BoxedUnit.UNIT);

如果您使用旧版本的 Scala,那么由于 traits 的编译方式,Function1 不是函数式接口,您将需要使用 scala-java8-compat 库。有了它,代码看起来像

import static scala.compat.java8.JFunction.*;

getStateActor(func((args: Tuple2<ActorRef, Object>) -> BoxedUnit.UNIT));

然后,要实现函数的逻辑,您可以使用 _1()_2() 方法访问元组的元素:

(args: Tuple2<ActorRef, Object>) -> {
    Object msg = args._2();
    if (msg instanceof String) {
        log.info("received msg, queueing: {} ", msg);
        messages = messages.enqueue((String) msg);
        pump();
    }
    return BoxedUnit.UNIT;
}

这是您要转换的逻辑的直接翻译。