Java 并发练习。异步下载

Java concurrency exercise. Asynchronous download

我正在做一个关于 Java 并发的练习,使用 wait,通知学习准备考试。 考试要写,所以代码一定要完美,不能尝试编译查错。

这是练习的文本:

总体思路:

这是我生成的代码:

public class Downloader{

    private Queue downloadQueue;
    private HashMap urlData;
    private final static THREADS_NUMBER;

    public Downloader(){
        this.downloadQueue = new Queue();
        this.urlData = new HashMap();
        for(int i = 0; i < THREADS_NUMBER; i++){
            new DownTh(this.downloadQueue, this.urlData).start();
        }
    }

    void syncronized download(String URL){
        downloadQueue.add(url);
        notifyAll();
        return;
    }

    byte[] syncronized getData(String URL){
        while(urlData.get(URL) == null ){
            wait()
        }

        return urlData.get(URL);
    }
}

public class DownTh extend Thread{

    private Queue downloadQueue;
    private HashMap urlData;

    public DownTh(Queue downloadQueue, HashMap urlData){
        this.downloadQueue = downloadQueue
        this.urlData = urlData;
    }

    public void run(){
        while(true){
            syncronized{
                while(queue.isEmpty()){
                    wait()
                }

                String url = queue.remove();
                urlData.add(url, Util.download(url))

                notifyAll()             
            }
        }
    }
}

你能帮我看看这个逻辑对不对吗?

让我们暂时假设 Java 中处理同步的那些伟大的 类 并不存在,因为这是一个合成任务,而你需要处理的只是 sychronized, waitnotify.

第一个简单回答的问题是:"Who is going to wait on what?"

  • 下载线程将等待 URL 下载。
  • 调用方将等待该下载线程的结果。

这详细是什么意思?我们需要至少一个调用者和下载线程(你的urlData)之间的同步元素,也应该有一个数据对象处理下载数据本身以方便,并检查下载是否已经完成.

因此将发生的详细步骤是:

  1. 调用方请求新的下载。
    创建:DownloadResult
    写入:urlData(url -> DownloadResult)
    在 urlData.

  2. 上唤醒 1 个线程
  3. 线程X必须找到数据下载并处理or/then再次入睡
    读取:url数据(先找到未处理的DownloadResult,否则等待在 urlData)
    write: DownloadResult (获取它)
    write: DownloadResult (下载结果)
    notify: anyone waiting on DownloadResult
    repeat

  4. 调用方必须能够异步 check/wait 下载结果。
    读取:url数据
    读取:下载结果(等待下载结果如果需要)

由于这些对象上存在来自不同线程的读取和写入,因此在访问对象 urlData 或 DownloadResult 时需要同步。

还会有一个wait/notify关联:

  • 调用方 -> url数据 -> DownTh
  • DownTh -> 下载结果 -> 调用者

经过仔细分析,以下代码可以满足要求:

public class DownloadResult {

  protected final URL url; // this is for convenience
  protected boolean inProgress;
  protected byte[] result;

  public DownloadResult(final URL url) {
    this.url = url;
    this.inProgress = false;
  }

  /* Try to lock this against tother threads if not already acquired. */
  public synchronized boolean acquire() {
    if (this.inProgress == false) {
      this.inProgress = true;
      return true;
    } else {
      return false;
    }
  }

  public void download() {
    final byte[] downloadedBytes = Util.download(this.url); // note how this is done outside the synchronized block to avoid unnecessarily long blockings
    synchronized (this) {
      this.result = downloadedBytes;
      this.notifyAll(); // wake-up ALL callers
    }
  }

  public synchronized byte[] getResult() throws InterruptedException {
    while (this.result == null) {
      this.wait();
    }
    return this.result;
  }

}

protected class DownTh extends Thread {

  protected final Map<URL, DownloadResult> urlData;

  public DownTh(final Map<URL, DownloadResult> urlData) {
    this.urlData = urlData;
    this.setDaemon(true); // this allows the JVM to shut down despite DownTh threads still running
  }

  protected DownloadResult getTask() {
    for (final DownloadResult downloadResult : urlData.values()) {
      if (downloadResult.acquire()) {
        return downloadResult;
      }
    }
    return null;
  }

  @Override
  public void run() {
    DownloadResult downloadResult;
    try {
      while (true) {
        synchronized (urlData) {
          while ((downloadResult = this.getTask()) == null) {
            urlData.wait();
          }
        }
        downloadResult.download();
      }
    } catch (InterruptedException ex) {
      // can be ignored
    } catch (Error e) {
      // log here
    }
  }
}

public class Downloader {

  protected final Map<URL, DownloadResult> urlData = new HashMap<>();

  // insert constructor that creates the threads here

  public DownloadResult download(final URL url) {
    final DownloadResult result = new DownloadResult(url);
    synchronized (urlData) {
      urlData.putIfAbsent(url, result);
      urlData.notify(); // only one thread needs to wake up
    }
    return result;
  }

  public byte[] getData(final URL url) throws InterruptedException {
    DownloadResult result;
    synchronized (urlData) {
      result = urlData.get(url);
    }
    if (result != null) {
      return result.getResult();
    } else {
      throw new IllegalStateException("URL " + url + " not requested.");
    }
  }
}

实际上 Java 事情会有所不同,通过使用并发 类 and/or 原子... 类,所以这仅用于教育目的。如需进一步阅读,请参阅 "Callable Future"。