volatile 变量的通量可见性问题
Flux visibility problems with volatile variable
我有一个简单的 Spring 控制器,它有一个 Spring 服务作为依赖项。
在服务 class 中,我有一个名为 flag 的 int 类型的静态易失性字段。
当我通过我的控制器调用 createFlux() 方法时,标志设置为 5,然后创建一个新的 Flux,它每秒检查一次标志,并根据标志值打印一条消息。由于 delayElements 方法语义,代码将并行执行。之后,如果我调用更改标志值的 changeFlag() 方法,并且由于标志变量是易变的,我希望打印的消息发生变化,但这并没有发生。
代码如下:
@RestController
public class MyController {
@Autowired private MyService myService;
@GetMapping("createFlux")
public void createFlux() {
myService.createFlux();
}
@GetMapping("changeFlag")
public void changeFlag() {
myService.changeFlag();
}
}
@Service
public class MyService {
private static volatile int flag = 3;
public void changeFlag() {
flag = 3;
System.out.println("############# Flag = " + flag);
}
public void createFlux() {
flag = 5;
System.out.println("Flag = " + flag);
Flux.generate(sink -> {
if (flag == 3) {
sink.next("Stop");
} else {
sink.next("Start");
}
}).delayElements(Duration.ofSeconds(1)).subscribe(s -> System.out.println(Thread.currentThread().getName() + " : " + s));
}
}
这是控制台中的输出:
Flag = 5
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
############# Flag = 3
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Stop
parallel-2 : Stop
parallel-3 : Stop
parallel-4 : Stop
parallel-5 : Stop
parallel-6 : Stop
从输出中可以看出,即使 flag 的值更改为 3,它仍继续打印消息 Start。一段时间后,打印的消息发生了变化。我想有一些缓存或类似的东西,但 volatile 变量没有被缓存。
问题是 - 这是错误还是我遗漏了什么?
为一堆元素立即执行生成消费者。您可以通过在 generate
:
之后添加日志轻松确认
Flux.generate(...).doOnNext(e -> log.info("executed: {}", e))
打印:
2022-01-20 13:31:50,346 INFO parallel-1 - Flag = 5
2022-01-20 13:31:50,349 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
generate
方法根据下游的需求发出元素。它首先生成 32 个元素并缓冲它们。当下游开始处理元素并且缓冲区大小低于阈值时,它会发出更多元素。
我有一个简单的 Spring 控制器,它有一个 Spring 服务作为依赖项。 在服务 class 中,我有一个名为 flag 的 int 类型的静态易失性字段。 当我通过我的控制器调用 createFlux() 方法时,标志设置为 5,然后创建一个新的 Flux,它每秒检查一次标志,并根据标志值打印一条消息。由于 delayElements 方法语义,代码将并行执行。之后,如果我调用更改标志值的 changeFlag() 方法,并且由于标志变量是易变的,我希望打印的消息发生变化,但这并没有发生。
代码如下:
@RestController
public class MyController {
@Autowired private MyService myService;
@GetMapping("createFlux")
public void createFlux() {
myService.createFlux();
}
@GetMapping("changeFlag")
public void changeFlag() {
myService.changeFlag();
}
}
@Service
public class MyService {
private static volatile int flag = 3;
public void changeFlag() {
flag = 3;
System.out.println("############# Flag = " + flag);
}
public void createFlux() {
flag = 5;
System.out.println("Flag = " + flag);
Flux.generate(sink -> {
if (flag == 3) {
sink.next("Stop");
} else {
sink.next("Start");
}
}).delayElements(Duration.ofSeconds(1)).subscribe(s -> System.out.println(Thread.currentThread().getName() + " : " + s));
}
}
这是控制台中的输出:
Flag = 5
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
############# Flag = 3
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Stop
parallel-2 : Stop
parallel-3 : Stop
parallel-4 : Stop
parallel-5 : Stop
parallel-6 : Stop
从输出中可以看出,即使 flag 的值更改为 3,它仍继续打印消息 Start。一段时间后,打印的消息发生了变化。我想有一些缓存或类似的东西,但 volatile 变量没有被缓存。
问题是 - 这是错误还是我遗漏了什么?
为一堆元素立即执行生成消费者。您可以通过在 generate
:
Flux.generate(...).doOnNext(e -> log.info("executed: {}", e))
打印:
2022-01-20 13:31:50,346 INFO parallel-1 - Flag = 5
2022-01-20 13:31:50,349 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
generate
方法根据下游的需求发出元素。它首先生成 32 个元素并缓冲它们。当下游开始处理元素并且缓冲区大小低于阈值时,它会发出更多元素。