Java 并发练习。异步下载
Java concurrency exercise. Asynchronous download
我正在做一个关于 Java 并发的练习,使用 wait,通知学习准备考试。
考试要写,所以代码一定要完美,不能尝试编译查错。
这是练习的文本:
总体思路:
- 当下载器被实例化时,队列和散列图被创建并传递给所有线程。 (共享数据)
- 下载方法将 url 添加到队列并调用 notifyAll 唤醒下载程序线程。
- getData 方法会一直等待,直到提供的 url 的散列图中有数据。当数据可用时,它 returns 给调用者。
- 下载线程运行无限循环。它一直等到 url 出现在队列中。当它收到 url 时,它会下载它并将字节放入 hashmap 中调用 notifyAll 以唤醒可能正在等待 getData 方法的用户。
这是我生成的代码:
public class Downloader{
private Queue downloadQueue;
private HashMap urlData;
private final static THREADS_NUMBER;
public Downloader(){
this.downloadQueue = new Queue();
this.urlData = new HashMap();
for(int i = 0; i < THREADS_NUMBER; i++){
new DownTh(this.downloadQueue, this.urlData).start();
}
}
void syncronized download(String URL){
downloadQueue.add(url);
notifyAll();
return;
}
byte[] syncronized getData(String URL){
while(urlData.get(URL) == null ){
wait()
}
return urlData.get(URL);
}
}
public class DownTh extend Thread{
private Queue downloadQueue;
private HashMap urlData;
public DownTh(Queue downloadQueue, HashMap urlData){
this.downloadQueue = downloadQueue
this.urlData = urlData;
}
public void run(){
while(true){
syncronized{
while(queue.isEmpty()){
wait()
}
String url = queue.remove();
urlData.add(url, Util.download(url))
notifyAll()
}
}
}
}
你能帮我看看这个逻辑对不对吗?
让我们暂时假设 Java 中处理同步的那些伟大的 类 并不存在,因为这是一个合成任务,而你需要处理的只是 sychronized
, wait
和 notify
.
第一个简单回答的问题是:"Who is going to wait on what?"
- 下载线程将等待 URL 下载。
- 调用方将等待该下载线程的结果。
这详细是什么意思?我们需要至少一个调用者和下载线程(你的urlData
)之间的同步元素,也应该有一个数据对象处理下载数据本身以方便,并检查下载是否已经完成.
因此将发生的详细步骤是:
调用方请求新的下载。
创建:DownloadResult
写入:urlData(url -> DownloadResult)
在 urlData.
上唤醒 1 个线程
线程X必须找到数据下载并处理or/then再次入睡
读取:url数据(先找到未处理的DownloadResult,否则等待在 urlData)
write: DownloadResult (获取它)
write: DownloadResult (下载结果)
notify: anyone waiting on DownloadResult
repeat
调用方必须能够异步 check/wait 下载结果。
读取:url数据
读取:下载结果(等待下载结果如果需要)
由于这些对象上存在来自不同线程的读取和写入,因此在访问对象 urlData 或 DownloadResult 时需要同步。
还会有一个wait/notify关联:
- 调用方 -> url数据 -> DownTh
- DownTh -> 下载结果 -> 调用者
经过仔细分析,以下代码可以满足要求:
public class DownloadResult {
protected final URL url; // this is for convenience
protected boolean inProgress;
protected byte[] result;
public DownloadResult(final URL url) {
this.url = url;
this.inProgress = false;
}
/* Try to lock this against tother threads if not already acquired. */
public synchronized boolean acquire() {
if (this.inProgress == false) {
this.inProgress = true;
return true;
} else {
return false;
}
}
public void download() {
final byte[] downloadedBytes = Util.download(this.url); // note how this is done outside the synchronized block to avoid unnecessarily long blockings
synchronized (this) {
this.result = downloadedBytes;
this.notifyAll(); // wake-up ALL callers
}
}
public synchronized byte[] getResult() throws InterruptedException {
while (this.result == null) {
this.wait();
}
return this.result;
}
}
protected class DownTh extends Thread {
protected final Map<URL, DownloadResult> urlData;
public DownTh(final Map<URL, DownloadResult> urlData) {
this.urlData = urlData;
this.setDaemon(true); // this allows the JVM to shut down despite DownTh threads still running
}
protected DownloadResult getTask() {
for (final DownloadResult downloadResult : urlData.values()) {
if (downloadResult.acquire()) {
return downloadResult;
}
}
return null;
}
@Override
public void run() {
DownloadResult downloadResult;
try {
while (true) {
synchronized (urlData) {
while ((downloadResult = this.getTask()) == null) {
urlData.wait();
}
}
downloadResult.download();
}
} catch (InterruptedException ex) {
// can be ignored
} catch (Error e) {
// log here
}
}
}
public class Downloader {
protected final Map<URL, DownloadResult> urlData = new HashMap<>();
// insert constructor that creates the threads here
public DownloadResult download(final URL url) {
final DownloadResult result = new DownloadResult(url);
synchronized (urlData) {
urlData.putIfAbsent(url, result);
urlData.notify(); // only one thread needs to wake up
}
return result;
}
public byte[] getData(final URL url) throws InterruptedException {
DownloadResult result;
synchronized (urlData) {
result = urlData.get(url);
}
if (result != null) {
return result.getResult();
} else {
throw new IllegalStateException("URL " + url + " not requested.");
}
}
}
实际上 Java 事情会有所不同,通过使用并发 类 and/or 原子... 类,所以这仅用于教育目的。如需进一步阅读,请参阅 "Callable Future"。
我正在做一个关于 Java 并发的练习,使用 wait,通知学习准备考试。 考试要写,所以代码一定要完美,不能尝试编译查错。
这是练习的文本:
- 当下载器被实例化时,队列和散列图被创建并传递给所有线程。 (共享数据)
- 下载方法将 url 添加到队列并调用 notifyAll 唤醒下载程序线程。
- getData 方法会一直等待,直到提供的 url 的散列图中有数据。当数据可用时,它 returns 给调用者。
- 下载线程运行无限循环。它一直等到 url 出现在队列中。当它收到 url 时,它会下载它并将字节放入 hashmap 中调用 notifyAll 以唤醒可能正在等待 getData 方法的用户。
这是我生成的代码:
public class Downloader{
private Queue downloadQueue;
private HashMap urlData;
private final static THREADS_NUMBER;
public Downloader(){
this.downloadQueue = new Queue();
this.urlData = new HashMap();
for(int i = 0; i < THREADS_NUMBER; i++){
new DownTh(this.downloadQueue, this.urlData).start();
}
}
void syncronized download(String URL){
downloadQueue.add(url);
notifyAll();
return;
}
byte[] syncronized getData(String URL){
while(urlData.get(URL) == null ){
wait()
}
return urlData.get(URL);
}
}
public class DownTh extend Thread{
private Queue downloadQueue;
private HashMap urlData;
public DownTh(Queue downloadQueue, HashMap urlData){
this.downloadQueue = downloadQueue
this.urlData = urlData;
}
public void run(){
while(true){
syncronized{
while(queue.isEmpty()){
wait()
}
String url = queue.remove();
urlData.add(url, Util.download(url))
notifyAll()
}
}
}
}
你能帮我看看这个逻辑对不对吗?
让我们暂时假设 Java 中处理同步的那些伟大的 类 并不存在,因为这是一个合成任务,而你需要处理的只是 sychronized
, wait
和 notify
.
第一个简单回答的问题是:"Who is going to wait on what?"
- 下载线程将等待 URL 下载。
- 调用方将等待该下载线程的结果。
这详细是什么意思?我们需要至少一个调用者和下载线程(你的urlData
)之间的同步元素,也应该有一个数据对象处理下载数据本身以方便,并检查下载是否已经完成.
因此将发生的详细步骤是:
调用方请求新的下载。
创建:DownloadResult
写入:urlData(url -> DownloadResult)
在 urlData. 上唤醒 1 个线程
线程X必须找到数据下载并处理or/then再次入睡
读取:url数据(先找到未处理的DownloadResult,否则等待在 urlData)
write: DownloadResult (获取它)
write: DownloadResult (下载结果)
notify: anyone waiting on DownloadResult
repeat调用方必须能够异步 check/wait 下载结果。
读取:url数据
读取:下载结果(等待下载结果如果需要)
由于这些对象上存在来自不同线程的读取和写入,因此在访问对象 urlData 或 DownloadResult 时需要同步。
还会有一个wait/notify关联:
- 调用方 -> url数据 -> DownTh
- DownTh -> 下载结果 -> 调用者
经过仔细分析,以下代码可以满足要求:
public class DownloadResult {
protected final URL url; // this is for convenience
protected boolean inProgress;
protected byte[] result;
public DownloadResult(final URL url) {
this.url = url;
this.inProgress = false;
}
/* Try to lock this against tother threads if not already acquired. */
public synchronized boolean acquire() {
if (this.inProgress == false) {
this.inProgress = true;
return true;
} else {
return false;
}
}
public void download() {
final byte[] downloadedBytes = Util.download(this.url); // note how this is done outside the synchronized block to avoid unnecessarily long blockings
synchronized (this) {
this.result = downloadedBytes;
this.notifyAll(); // wake-up ALL callers
}
}
public synchronized byte[] getResult() throws InterruptedException {
while (this.result == null) {
this.wait();
}
return this.result;
}
}
protected class DownTh extends Thread {
protected final Map<URL, DownloadResult> urlData;
public DownTh(final Map<URL, DownloadResult> urlData) {
this.urlData = urlData;
this.setDaemon(true); // this allows the JVM to shut down despite DownTh threads still running
}
protected DownloadResult getTask() {
for (final DownloadResult downloadResult : urlData.values()) {
if (downloadResult.acquire()) {
return downloadResult;
}
}
return null;
}
@Override
public void run() {
DownloadResult downloadResult;
try {
while (true) {
synchronized (urlData) {
while ((downloadResult = this.getTask()) == null) {
urlData.wait();
}
}
downloadResult.download();
}
} catch (InterruptedException ex) {
// can be ignored
} catch (Error e) {
// log here
}
}
}
public class Downloader {
protected final Map<URL, DownloadResult> urlData = new HashMap<>();
// insert constructor that creates the threads here
public DownloadResult download(final URL url) {
final DownloadResult result = new DownloadResult(url);
synchronized (urlData) {
urlData.putIfAbsent(url, result);
urlData.notify(); // only one thread needs to wake up
}
return result;
}
public byte[] getData(final URL url) throws InterruptedException {
DownloadResult result;
synchronized (urlData) {
result = urlData.get(url);
}
if (result != null) {
return result.getResult();
} else {
throw new IllegalStateException("URL " + url + " not requested.");
}
}
}
实际上 Java 事情会有所不同,通过使用并发 类 and/or 原子... 类,所以这仅用于教育目的。如需进一步阅读,请参阅 "Callable Future"。