RxJava 使用优化请求
RxJava usage optimization request
今天我尝试解决了一个小挑战:
你是一家拥有 500 个办事处的大公司,你想计算全球收入(每个办事处的收入总和)。
每个办公室公开一个服务来获得收入。该调用需要一定的延迟(网络、数据库访问、...)。
显然,您希望尽快获得全球收入。
首先我在 python 中进行了尝试,结果相当不错:
import asyncio
import time
DELAYS = (475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175)
class Office:
def __init__(self, delay, name, revenue):
self.delay = delay
self.name = name
self.revenue = revenue
async def compute(self):
await asyncio.sleep(self.delay / 1000)
print(f'{self.name} finished in {self.delay}ms')
return self.revenue
async def main(offices, totest):
computed = sum(await asyncio.gather(*[o.compute() for o in offices]))
verdict = ['nok', 'ok'][computed == totest]
print(f'Sum of revenues = {computed} {verdict}')
if __name__ == "__main__":
offices = [Office(DELAYS[i % len(DELAYS)], f'Office-{i}', 3 * i + 10) for i in range(500)]
totest = sum(o.revenue for o in offices)
start = time.perf_counter()
asyncio.run(main(offices, totest))
end = time.perf_counter()
print(f'Ends in {(end-start)*1000:.3f}ms')
在我的电脑上大约需要 500 毫秒,这是理想情况(因为 500 毫秒是最大延迟)
接下来,我在 java 中尝试使用 RxJava:
import java.util.concurrent.TimeUnit;
public class Office {
private int sleepTime;
private String name;
private int revenue;
public Office(int sleepTime, String name, int revenue) {
this.sleepTime = sleepTime;
this.name = name;
this.revenue = revenue;
}
public int getRevenue() {
return revenue;
}
public int compute() {
try {
TimeUnit.MILLISECONDS.sleep(this.sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s finished in %dms on thread %d%n", this.name, this.sleepTime, Thread.currentThread().getId());
return this.revenue;
}
}
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
public class Tester {
private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};
public static void main(String[] args) {
final ArrayList<Office> offices = new ArrayList<>();
for (int i = 0; i < 500; i++) {
offices.add(new Office(DELAYS[i % DELAYS.length], String.format("Office-%d", i), 3 * i + 10));
}
int totest = offices.stream().mapToInt(Office::getRevenue).sum();
final Instant start = Instant.now();
final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
int computation = officeObservable.parallel(500).runOn(Schedulers.io()).map(Office::compute).reduce(Integer::sum).blockingSingle();
boolean verdict = computation == totest;
System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
final Instant end = Instant.now();
System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
}
}
在我的电脑上,大约需要 1000 毫秒(有 500 个线程的池!!)。
当然,我尝试了不同数量的线程,但结果最差或相似。
我不想比较Python和Java,我只想:
如有错误请解释
更好的方法?
此外,python async 仅使用一个线程,但在 Java 中,我没有找到如何不使用多线程来获得类似的结果。
也许有人可以帮助我? :-)
很简单。在 python 端,您以异步模式等待(不阻塞)
在 java 方面,您等待阻塞代码,因此有所不同。
java中的正确代码应该是:
package com.test;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
public class TestReactive {
public static class Office {
private int sleepTime;
private String name;
private int revenue;
public Office(int sleepTime, String name, int revenue) {
this.sleepTime = sleepTime;
this.name = name;
this.revenue = revenue;
}
public int getRevenue() {
return revenue;
}
public Publisher<Integer> compute() {
return Single.just("")
.delay(this.sleepTime, TimeUnit.MILLISECONDS)
.map(x-> {
System.out.printf("%s finished in %dms on thread %d%n", this.name, this.sleepTime, Thread.currentThread().getId());
return this.revenue;
}).toFlowable();
}
}
private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};
public static void main(String[] args) {
final ArrayList<Office> offices = new ArrayList<>();
for (int i = 0; i < 500; i++) {
offices.add(new Office(DELAYS[i % DELAYS.length], String.format("Office-%d", i), 3 * i + 10));
}
int totest = offices.stream().mapToInt(Office::getRevenue).sum();
final Instant start = Instant.now();
final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
int computation = officeObservable.parallel(2).runOn(Schedulers.io()).flatMap(Office::compute).reduce(Integer::sum).blockingSingle();
boolean verdict = computation == totest;
System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
final Instant end = Instant.now();
System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
}
}
编辑:我将并行设置为 2,但谁在乎呢,您可以放置一个线程,因为它不是 CPU 限制问题。
经过多次尝试(感谢M.T的帮助),终于Java实现了!
public class Office {
private int sleepTime;
private int revenue;
public Office(int sleepTime, int revenue) {
this.sleepTime = sleepTime;
this.revenue = revenue;
}
public int getRevenue() {
return revenue;
}
public Single<Integer> compute() {
return Single.timer(sleepTime, TimeUnit.MILLISECONDS).map(l -> this.revenue);
}
}
public class Tester {
private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};
public static void main(String[] args) {
final ArrayList<Office> offices = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) {
offices.add(new Office(DELAYS[i % DELAYS.length], 1));
}
int totest = offices.stream().mapToInt(Office::getRevenue).sum();
final Instant start = Instant.now();
final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
int computation = officeObservable.flatMapSingle(Office::compute).reduce(Integer::sum).blockingGet();
boolean verdict = computation == totest;
System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
final Instant end = Instant.now();
System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
}
}
这段代码速度超快! 1_000_000 个办公室 2 秒!
今天我尝试解决了一个小挑战:
你是一家拥有 500 个办事处的大公司,你想计算全球收入(每个办事处的收入总和)。
每个办公室公开一个服务来获得收入。该调用需要一定的延迟(网络、数据库访问、...)。
显然,您希望尽快获得全球收入。
首先我在 python 中进行了尝试,结果相当不错:
import asyncio
import time
DELAYS = (475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175)
class Office:
def __init__(self, delay, name, revenue):
self.delay = delay
self.name = name
self.revenue = revenue
async def compute(self):
await asyncio.sleep(self.delay / 1000)
print(f'{self.name} finished in {self.delay}ms')
return self.revenue
async def main(offices, totest):
computed = sum(await asyncio.gather(*[o.compute() for o in offices]))
verdict = ['nok', 'ok'][computed == totest]
print(f'Sum of revenues = {computed} {verdict}')
if __name__ == "__main__":
offices = [Office(DELAYS[i % len(DELAYS)], f'Office-{i}', 3 * i + 10) for i in range(500)]
totest = sum(o.revenue for o in offices)
start = time.perf_counter()
asyncio.run(main(offices, totest))
end = time.perf_counter()
print(f'Ends in {(end-start)*1000:.3f}ms')
在我的电脑上大约需要 500 毫秒,这是理想情况(因为 500 毫秒是最大延迟)
接下来,我在 java 中尝试使用 RxJava:
import java.util.concurrent.TimeUnit;
public class Office {
private int sleepTime;
private String name;
private int revenue;
public Office(int sleepTime, String name, int revenue) {
this.sleepTime = sleepTime;
this.name = name;
this.revenue = revenue;
}
public int getRevenue() {
return revenue;
}
public int compute() {
try {
TimeUnit.MILLISECONDS.sleep(this.sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s finished in %dms on thread %d%n", this.name, this.sleepTime, Thread.currentThread().getId());
return this.revenue;
}
}
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
public class Tester {
private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};
public static void main(String[] args) {
final ArrayList<Office> offices = new ArrayList<>();
for (int i = 0; i < 500; i++) {
offices.add(new Office(DELAYS[i % DELAYS.length], String.format("Office-%d", i), 3 * i + 10));
}
int totest = offices.stream().mapToInt(Office::getRevenue).sum();
final Instant start = Instant.now();
final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
int computation = officeObservable.parallel(500).runOn(Schedulers.io()).map(Office::compute).reduce(Integer::sum).blockingSingle();
boolean verdict = computation == totest;
System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
final Instant end = Instant.now();
System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
}
}
在我的电脑上,大约需要 1000 毫秒(有 500 个线程的池!!)。
当然,我尝试了不同数量的线程,但结果最差或相似。
我不想比较Python和Java,我只想:
如有错误请解释
更好的方法?
此外,python async 仅使用一个线程,但在 Java 中,我没有找到如何不使用多线程来获得类似的结果。
也许有人可以帮助我? :-)
很简单。在 python 端,您以异步模式等待(不阻塞) 在 java 方面,您等待阻塞代码,因此有所不同。
java中的正确代码应该是:
package com.test;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
public class TestReactive {
public static class Office {
private int sleepTime;
private String name;
private int revenue;
public Office(int sleepTime, String name, int revenue) {
this.sleepTime = sleepTime;
this.name = name;
this.revenue = revenue;
}
public int getRevenue() {
return revenue;
}
public Publisher<Integer> compute() {
return Single.just("")
.delay(this.sleepTime, TimeUnit.MILLISECONDS)
.map(x-> {
System.out.printf("%s finished in %dms on thread %d%n", this.name, this.sleepTime, Thread.currentThread().getId());
return this.revenue;
}).toFlowable();
}
}
private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};
public static void main(String[] args) {
final ArrayList<Office> offices = new ArrayList<>();
for (int i = 0; i < 500; i++) {
offices.add(new Office(DELAYS[i % DELAYS.length], String.format("Office-%d", i), 3 * i + 10));
}
int totest = offices.stream().mapToInt(Office::getRevenue).sum();
final Instant start = Instant.now();
final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
int computation = officeObservable.parallel(2).runOn(Schedulers.io()).flatMap(Office::compute).reduce(Integer::sum).blockingSingle();
boolean verdict = computation == totest;
System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
final Instant end = Instant.now();
System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
}
}
编辑:我将并行设置为 2,但谁在乎呢,您可以放置一个线程,因为它不是 CPU 限制问题。
经过多次尝试(感谢M.T的帮助),终于Java实现了!
public class Office {
private int sleepTime;
private int revenue;
public Office(int sleepTime, int revenue) {
this.sleepTime = sleepTime;
this.revenue = revenue;
}
public int getRevenue() {
return revenue;
}
public Single<Integer> compute() {
return Single.timer(sleepTime, TimeUnit.MILLISECONDS).map(l -> this.revenue);
}
}
public class Tester {
private static int[] DELAYS = {475, 500, 375, 100, 250, 125, 150, 225, 200, 425, 275, 350, 450, 325, 400, 300, 175};
public static void main(String[] args) {
final ArrayList<Office> offices = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) {
offices.add(new Office(DELAYS[i % DELAYS.length], 1));
}
int totest = offices.stream().mapToInt(Office::getRevenue).sum();
final Instant start = Instant.now();
final Flowable<Office> officeObservable = Flowable.fromIterable(offices);
int computation = officeObservable.flatMapSingle(Office::compute).reduce(Integer::sum).blockingGet();
boolean verdict = computation == totest;
System.out.println("" + computation + " " + (verdict ? "ok" : "nok"));
final Instant end = Instant.now();
System.out.printf("Ends in %dms%n", Duration.between(start, end).toMillis());
}
}
这段代码速度超快! 1_000_000 个办公室 2 秒!