Apache Flink,任务槽数与 env.setParallelism

Apache Flink, number of Task Slot vs env.setParallelism

您能解释一下 Apache Flink v1.9 中任务槽和并行性之间的区别吗?


public class AverageSensorReadings {
 public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  int paralellism = env.getParallelism();
  int maxParal = env.getMaxParallelism();

  // ingest sensor stream
  DataStream < SensorReading > sensorData = env
   // SensorSource generates random temperature readings
   .addSource(new SensorSource())
   // assign timestamps and watermarks which are required for event time
   .assignTimestampsAndWatermarks(new SensorTimeAssigner());

  DataStream < SensorReading > avgTemp = sensorData
   // convert Fahrenheit to Celsius using and inlined map function
   .map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
   // organize stream by sensor
   .keyBy(r -> r.id)
   // group readings in 1 second windows
   .timeWindow(Time.seconds(4))
   // compute average temperature using a user-defined function
   .apply(new TemperatureAverager());

  // print result stream to standard out
  //avgTemp.print();
  System.out.println("paral: " + paralellism + " max paral: " + maxParal);
  // execute application
  env.execute("Compute average sensor temperature");
 }

 public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {

  /**
   * apply() is invoked once for each window.
   *
   * @param sensorId the key (sensorId) of the window
   * @param window meta data for the window
   * @param input an iterable over the collected sensor readings that were assigned to the window
   * @param out a collector to emit results from the function
   */
  @Override
  public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
   System.out.println("APPLY FUNCTION START POINT");
   System.out.println("sensorId: " + sensorId + "\n");

   // compute the average temperature
   int cnt = 0;
   double sum = 0.0;
   for (SensorReading r: input) {
    System.out.println("collected item: " + r);
    cnt++;
    sum += r.temperature;
   }
   double avgTemp = sum / cnt;
   System.out.println("APPLY FUNCTION END POINT");
   System.out.println("----------------------------\n\n");
   // emit a SensorReading with the average temperature
   out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
  }
 }
}

通常每个插槽将 运行 您的管道的一个并行实例。因此,作业的并行度与 运行 它所需的槽数相同。 (通过使用插槽共享组,您可以将特定任务强制放入它们自己的插槽中,这将增加所需的插槽数量。)

每个任务(包括一个或多个链接在一起的运算符)运行在一个 Java 线程中。

任务管理器可以创建任意数量的插槽。典型配置每个插槽使用 1 CPU 个核心,但对于处理要求很高的管道,您可能希望每个插槽有 2 个或更多核心,而对于大部分空闲的管道,您可能会朝另一个方向配置多个插槽每个核心。

任务管理器中的所有 tasks/threads 运行 宁将简单地竞争任务管理器可以从托管它的机器或容器获得的 CPU 资源。

所有状态对于使用它的一个运算符实例(任务)都是本地的,因此所有访问都发生在该线程内。假设可能存在竞争条件的地方是 ProcessFunction 中的 onTimer 和 processElement 回调之间,但这些方法是同步的,因此您不必担心这一点。因为所有状态访问都是本地的,所以这导致高吞吐量、低延迟和高可扩展性。

在您的示例中,如果并行度为 2,那么您将有两个槽在数据的不同切片上独立执行相同的逻辑。如果他们使用状态,那么这将是 Flink 管理的键分区状态,您可以将其视为分片 key/value 存储。

在传感器数据及时的情况下windows,你完全不必担心多线程。 keyBy 将对数据进行分区,以便一个实例将处理所有事件,windows 用于某些传感器,另一个实例(假设有两个)将处理其余的。