Apache Flink,任务槽数与 env.setParallelism
Apache Flink, number of Task Slot vs env.setParallelism
您能解释一下 Apache Flink v1.9 中任务槽和并行性之间的区别吗?
这是我目前的理解
- Flink 说 TaskManager 是工作进程。通常你应该每台电脑有一个 TaskManager。
- 假设我有 3 台计算机,它们都有 16 CPU 个内核。每台计算机都会有 TaskManager。因此我将有 3 个 TaskManager
- 我想如果一台计算机有 16 cpu 个核心,那么 TaskManager 最多可以创建 16 个任务槽。因此那里有一个 CPU 隔离。但是 Flink 说 link => “请注意,这里没有 CPU 隔离;当前插槽仅分隔任务的托管内存。”
- 这意味着 16 个插槽 = 16 个线程?还有
numberOfSlot can be >= numberOfCpuCores
?
如果任务槽表示线程,这可能导致"shared to access data problem, race condition"等..?这是我的第一个问题。
- 第二个问题是我写到 post => 任务槽和并行性之间差异的开头。我说的是 env.setparalellism(number)。
- 假设我的并行数 = 2
- 那么对于每个任务槽(线程或其他什么)将使用 2 个线程执行?
- 如果是,这可能会导致 "shared to access data problem, race condition" 等.?
- 如果不是,并行度是什么意思?
- 这是例子。在此示例中,由于线程环境,我是否应该关心编写
apply()
方法?:
public class AverageSensorReadings {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int paralellism = env.getParallelism();
int maxParal = env.getMaxParallelism();
// ingest sensor stream
DataStream < SensorReading > sensorData = env
// SensorSource generates random temperature readings
.addSource(new SensorSource())
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner());
DataStream < SensorReading > avgTemp = sensorData
// convert Fahrenheit to Celsius using and inlined map function
.map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
// organize stream by sensor
.keyBy(r -> r.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(4))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager());
// print result stream to standard out
//avgTemp.print();
System.out.println("paral: " + paralellism + " max paral: " + maxParal);
// execute application
env.execute("Compute average sensor temperature");
}
public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {
/**
* apply() is invoked once for each window.
*
* @param sensorId the key (sensorId) of the window
* @param window meta data for the window
* @param input an iterable over the collected sensor readings that were assigned to the window
* @param out a collector to emit results from the function
*/
@Override
public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
System.out.println("APPLY FUNCTION START POINT");
System.out.println("sensorId: " + sensorId + "\n");
// compute the average temperature
int cnt = 0;
double sum = 0.0;
for (SensorReading r: input) {
System.out.println("collected item: " + r);
cnt++;
sum += r.temperature;
}
double avgTemp = sum / cnt;
System.out.println("APPLY FUNCTION END POINT");
System.out.println("----------------------------\n\n");
// emit a SensorReading with the average temperature
out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
}
}
}
通常每个插槽将 运行 您的管道的一个并行实例。因此,作业的并行度与 运行 它所需的槽数相同。 (通过使用插槽共享组,您可以将特定任务强制放入它们自己的插槽中,这将增加所需的插槽数量。)
每个任务(包括一个或多个链接在一起的运算符)运行在一个 Java 线程中。
任务管理器可以创建任意数量的插槽。典型配置每个插槽使用 1 CPU 个核心,但对于处理要求很高的管道,您可能希望每个插槽有 2 个或更多核心,而对于大部分空闲的管道,您可能会朝另一个方向配置多个插槽每个核心。
任务管理器中的所有 tasks/threads 运行 宁将简单地竞争任务管理器可以从托管它的机器或容器获得的 CPU 资源。
所有状态对于使用它的一个运算符实例(任务)都是本地的,因此所有访问都发生在该线程内。假设可能存在竞争条件的地方是 ProcessFunction 中的 onTimer 和 processElement 回调之间,但这些方法是同步的,因此您不必担心这一点。因为所有状态访问都是本地的,所以这导致高吞吐量、低延迟和高可扩展性。
在您的示例中,如果并行度为 2,那么您将有两个槽在数据的不同切片上独立执行相同的逻辑。如果他们使用状态,那么这将是 Flink 管理的键分区状态,您可以将其视为分片 key/value 存储。
在传感器数据及时的情况下windows,你完全不必担心多线程。 keyBy 将对数据进行分区,以便一个实例将处理所有事件,windows 用于某些传感器,另一个实例(假设有两个)将处理其余的。
您能解释一下 Apache Flink v1.9 中任务槽和并行性之间的区别吗?
这是我目前的理解
- Flink 说 TaskManager 是工作进程。通常你应该每台电脑有一个 TaskManager。
- 假设我有 3 台计算机,它们都有 16 CPU 个内核。每台计算机都会有 TaskManager。因此我将有 3 个 TaskManager
- 我想如果一台计算机有 16 cpu 个核心,那么 TaskManager 最多可以创建 16 个任务槽。因此那里有一个 CPU 隔离。但是 Flink 说 link => “请注意,这里没有 CPU 隔离;当前插槽仅分隔任务的托管内存。”
- 这意味着 16 个插槽 = 16 个线程?还有
numberOfSlot can be >= numberOfCpuCores
?
如果任务槽表示线程,这可能导致"shared to access data problem, race condition"等..?这是我的第一个问题。
- 第二个问题是我写到 post => 任务槽和并行性之间差异的开头。我说的是 env.setparalellism(number)。
- 假设我的并行数 = 2
- 那么对于每个任务槽(线程或其他什么)将使用 2 个线程执行?
- 如果是,这可能会导致 "shared to access data problem, race condition" 等.?
- 如果不是,并行度是什么意思?
- 这是例子。在此示例中,由于线程环境,我是否应该关心编写
apply()
方法?:
public class AverageSensorReadings {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int paralellism = env.getParallelism();
int maxParal = env.getMaxParallelism();
// ingest sensor stream
DataStream < SensorReading > sensorData = env
// SensorSource generates random temperature readings
.addSource(new SensorSource())
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner());
DataStream < SensorReading > avgTemp = sensorData
// convert Fahrenheit to Celsius using and inlined map function
.map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
// organize stream by sensor
.keyBy(r -> r.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(4))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager());
// print result stream to standard out
//avgTemp.print();
System.out.println("paral: " + paralellism + " max paral: " + maxParal);
// execute application
env.execute("Compute average sensor temperature");
}
public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {
/**
* apply() is invoked once for each window.
*
* @param sensorId the key (sensorId) of the window
* @param window meta data for the window
* @param input an iterable over the collected sensor readings that were assigned to the window
* @param out a collector to emit results from the function
*/
@Override
public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
System.out.println("APPLY FUNCTION START POINT");
System.out.println("sensorId: " + sensorId + "\n");
// compute the average temperature
int cnt = 0;
double sum = 0.0;
for (SensorReading r: input) {
System.out.println("collected item: " + r);
cnt++;
sum += r.temperature;
}
double avgTemp = sum / cnt;
System.out.println("APPLY FUNCTION END POINT");
System.out.println("----------------------------\n\n");
// emit a SensorReading with the average temperature
out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
}
}
}
通常每个插槽将 运行 您的管道的一个并行实例。因此,作业的并行度与 运行 它所需的槽数相同。 (通过使用插槽共享组,您可以将特定任务强制放入它们自己的插槽中,这将增加所需的插槽数量。)
每个任务(包括一个或多个链接在一起的运算符)运行在一个 Java 线程中。
任务管理器可以创建任意数量的插槽。典型配置每个插槽使用 1 CPU 个核心,但对于处理要求很高的管道,您可能希望每个插槽有 2 个或更多核心,而对于大部分空闲的管道,您可能会朝另一个方向配置多个插槽每个核心。
任务管理器中的所有 tasks/threads 运行 宁将简单地竞争任务管理器可以从托管它的机器或容器获得的 CPU 资源。
所有状态对于使用它的一个运算符实例(任务)都是本地的,因此所有访问都发生在该线程内。假设可能存在竞争条件的地方是 ProcessFunction 中的 onTimer 和 processElement 回调之间,但这些方法是同步的,因此您不必担心这一点。因为所有状态访问都是本地的,所以这导致高吞吐量、低延迟和高可扩展性。
在您的示例中,如果并行度为 2,那么您将有两个槽在数据的不同切片上独立执行相同的逻辑。如果他们使用状态,那么这将是 Flink 管理的键分区状态,您可以将其视为分片 key/value 存储。
在传感器数据及时的情况下windows,你完全不必担心多线程。 keyBy 将对数据进行分区,以便一个实例将处理所有事件,windows 用于某些传感器,另一个实例(假设有两个)将处理其余的。