如何以多线程方式从 rpt 生成 PDF 文档?

How to generate PDF documents from rpt in a multi-threaded approach?

我有一个 rpt 文件,我将使用它生成多份 pdf 格式的报告。使用来自 inet clear 报告的引擎 class。这个过程需要很长时间,因为我要生成近 10000 份报告。我可以使用多线程或其他一些方法来加快进程吗?

任何有关如何完成的帮助都会有所帮助

我的部分代码。

 //Loops
 Engine eng = new Engine(Engine.EXPORT_PDF);
 eng.setReportFile(rpt); //rpt is the report name
 if (cn.isClosed() || cn == null ) {
    cn = ds.getConnection();
 }
 eng.setConnection(cn);
 System.out.println(" After set connection");
 eng.setPrompt(data[i], 0);
 ReportProperties repprop = eng.getReportProperties();
 repprop.setPaperOrient(ReportProperties.DEFAULT_PAPER_ORIENTATION, ReportProperties.PAPER_FANFOLD_US);
 eng.execute();
 System.out.println(" After excecute");
 try {
      PDFExportThread pdfExporter = new PDFExportThread(eng, sFileName, sFilePath);
      pdfExporter.execute();
 } catch (Exception e) {
      e.printStackTrace();
 }

PDFExportThread 执行

 public void execute() throws IOException {
      FileOutputStream fos = null;
      try {
           String FileName = sFileName + "_" + (eng.getPageCount() - 1);
           File file = new File(sFilePath + FileName + ".pdf");
           if (!file.getParentFile().exists()) {
                file.getParentFile().mkdirs();
           }
           if (!file.exists()) {
                file.createNewFile();
           }
           fos = new FileOutputStream(file);
           for (int k = 1; k <= eng.getPageCount(); k++) {
                fos.write(eng.getPageData(k));
           }
           fos.flush();
           fos.close();
      } catch (Exception e) {
           e.printStackTrace();
      } finally {
           if (fos != null) {
                fos.close();
                fos = null;
           }
      }
 }

我将提供此 "answer" 作为可能的快速但粗略的解决方案,以帮助您开始并行化工作。

您将以某种方式构建渲染农场。 我认为在 java 中没有简单的方法可以做到这一点;我希望有人 post 能够通过几行代码展示如何并行化您的示例。但在此之前,这有望帮助您取得一些进步。

您将在同一个 JVM 实例中进行有限的扩展。 但是......让我们看看你能做到多远,看看它是否有帮助足够

设计挑战 #1:重新启动。

您可能需要一个地方来保存每个报告的状态,例如"units of work".

如果您需要 re-start 一切(也许您的服务器崩溃)并且您不想 re-run 到目前为止的所有报告,您需要这个。

有很多方法可以做到这一点;数据库,检查您的报告文件夹中是否存在 "completed" 文件(不足以让 *.pdf 存在,因为它可能不完整...对于 xyz_200.pdf,您可能会创建一个空的 xyz_200.done 或 xyz_200.err 文件以帮助解决 re-run 任何问题 children... 并且当您编写该文件 manipulation/checking/initialization 逻辑时,似乎它可能向包含工作列表的数据库添加一列更容易 to-be-done).

设计考虑 #2:最大化吞吐量(避免过载)。

您不想让您的系统和 运行 并行处理一千份报告。 也许 10。
也许有 100 个。
可能不是 5,000。
您需要进行一些规模研究,看看是什么让您的系统利用率接近 80% 到 90%。

设计考虑因素 #3:跨多个服务器扩展

过于复杂,超出了 Stack Exchange 答案的范围。 您必须在多个系统上启动 JVM,这些系统 运行 类似于下面的工作人员,以及一个 report-manager 可以从共享的 "queue" 结构中提取工作项,又是一个数据库table 在这里可能比做某事 file-based(或网络提要)更容易。

示例代码

注意:None 这段代码经过了很好的测试,几乎可以肯定它有大量的拼写错误、逻辑错误和糟糕的设计。使用风险自负。

所以无论如何......我想给你一个基本任务的基本概念 运行ner。 将问题中的“// Loops”示例替换为如下代码:

主循环(原始代码示例)

这或多或少地做了您的示例代码所做的,修改为将大部分工作推送到 ReportWorker(新 class,见下文)。很多东西似乎都包含在您最初问题的“// Loop”示例中,所以我不想对其进行逆向工程。

fwiw,我不清楚 "rpt" 和 "data[i]" 来自哪里,所以我破解了一些测试数据。

public class Main {

   public static boolean complete( String data ) {
      return false; // for testing nothing is complete.
   }

    public static void main(String args[] ) {

    String data[] = new String[] { 
         "A",
         "B",
         "C",
         "D",
         "E" };
    String rpt = "xyz";

    // Loop
    ReportManager reportMgr = new ReportManager();  // a new helper class (see below), it assigns/monitors work.
    long startTime = System.currentTimeMillis();
    for( int i = 0; i < data.length; ++i ) {
       // complete is something you should write that knows if a report "unit of  work"
       // finished successfully.
       if( !complete( data[i] ) ) {
          reportMgr.assignWork(  rpt, data[i] ); // so... where did values for your "rpt" variable come from?
       }
    }
    reportMgr.waitForWorkToFinish(); // out of new work to assign, let's wait until everything in-flight complete.
    long endTime = System.currentTimeMillis();
    System.out.println("Done.  Elapsed time = " + (endTime - startTime)/1000 +" seconds.");

   }

}

报表管理器

这 class 不是线程安全的,只需让您的原始循环继续调用 assignWork() 直到您没有要分配的报告,然后继续调用它直到所有工作完成,例如waitForWorkToFinish(),如上所示。 (顺便说一句,我不认为你可以说这里的任何 classes 都是特别线程安全的)。

public class ReportManager {

   public int polling_delay = 500; // wait 0.5 seconds for testing.
   //public int polling_delay = 60 * 1000; // wait 1 minute.
   // not high throughput millions of reports / second, we'll run at a slower tempo.
   public int nWorkers = 3; // just 3 for testing.
   public int assignedCnt = 0;
   public ReportWorker workers[];

   public ReportManager() {
      // initialize our manager.
      workers = new ReportWorker[ nWorkers ];
      for( int i = 0; i < nWorkers; ++i ) {
         workers[i] = new ReportWorker( i );
         System.out.println("Created worker #"+i);
      }
   }

   private ReportWorker handleWorkerError( int i  ) {
      // something went wrong, update our "report" status as one of the reports failed.
      System.out.println("handlerWokerError(): failure in "+workers[i]+", resetting worker.");
      workers[i].teardown();
      workers[i] = new ReportWorker( i ); // just replace everything.
      return workers[i]; // the new worker will, incidentally, be avaialble.
   }

   private ReportWorker handleWorkerComplete( int i ) {
      // this unit of work was completed, update our "report" status tracker as success.
      System.out.println("handleWorkerComplete(): success in "+workers[i]+", resetting worker.");
      workers[i].teardown();
      workers[i] = new ReportWorker( i ); // just replace everything.
      return workers[i]; // the new worker will, incidentally, be avaialble.
   }

   private int activeWorkerCount() {
      int activeCnt = 0;
      for( int i = 0; i < nWorkers; ++i ) {
         ReportWorker worker = workers[i];
         System.out.println("activeWorkerCount() i="+i+", checking worker="+worker);
         if( worker.hasError() ) {
            worker = handleWorkerError( i );
         }
         if( worker.isComplete() ) {
            worker = handleWorkerComplete( i );
         }
         if( worker.isInitialized() || worker.isRunning() ) {
            ++activeCnt;
         }
      }
      System.out.println("activeWorkerCount() activeCnt="+activeCnt);
      return activeCnt;
   }

   private ReportWorker getAvailableWorker() {
      // check each worker to see if anybody recently completed...
      // This (rather lazily) creates completely new ReportWorker instances.
      // You might want to try pooling (salvaging and reinitializing them)
      // to see if that helps your performance.

      System.out.println("\n-----");
      ReportWorker firstAvailable = null;
      for( int i = 0; i < nWorkers; ++i ) {
         ReportWorker worker = workers[i];
         System.out.println("getAvailableWorker(): i="+i+" worker="+worker);
         if( worker.hasError() ) {
            worker = handleWorkerError( i );
         }
         if( worker.isComplete() ) {
            worker = handleWorkerComplete( i );
         }
         if( worker.isAvailable() && firstAvailable==null ) {
            System.out.println("Apparently worker "+worker+" is 'available'");
            firstAvailable  = worker;
            System.out.println("getAvailableWorker(): i="+i+" now firstAvailable = "+firstAvailable);
         }
      }
      return firstAvailable;  // May (or may not) be null.
   }

   public void assignWork(  String rpt, String data ) {
      ReportWorker worker = getAvailableWorker();
      while( worker == null ) {
         System.out.println("assignWork: No workers available, sleeping for "+polling_delay);
         try { Thread.sleep( polling_delay ); }
         catch( InterruptedException e ) { System.out.println("assignWork: sleep interrupted, ignoring exception "+e); }
         // any workers avaialble now?
         worker = getAvailableWorker();
      }
      ++assignedCnt;
      worker.initialize( rpt, data ); // or whatever else you need.
      System.out.println("assignment #"+assignedCnt+" given to "+worker);
      Thread t = new Thread( worker );
      t.start( ); // that is pretty much it, let it go.
   }

   public void waitForWorkToFinish() {
      int active = activeWorkerCount();
      while( active >= 1 ) {
         System.out.println("waitForWorkToFinish(): #active workers="+active+", waiting...");
         // wait a minute....
         try { Thread.sleep( polling_delay ); }
         catch( InterruptedException e ) { System.out.println("assignWork: sleep interrupted, ignoring exception "+e); }
         active = activeWorkerCount();
      }
   }
}

ReportWorker

public class ReportWorker implements Runnable {
      int test_delay = 10*1000; //sleep for 10 seconds.
      // (actual code would be generating PDF output)

      public enum StatusCodes { UNINITIALIZED,
          INITIALIZED,
          RUNNING,
          COMPLETE,
          ERROR };


      int id = -1;
      StatusCodes status = StatusCodes.UNINITIALIZED;
      boolean initialized = false;
      public String rpt = "";
      public String data = "";
      //Engine eng;
      //PDFExportThread pdfExporter;
      //DataSource_type cn;

      public boolean isInitialized() { return initialized; }
      public boolean isAvailable()   { return status == StatusCodes.UNINITIALIZED; }
      public boolean isRunning()     { return status == StatusCodes.RUNNING; }
      public boolean isComplete()    { return status == StatusCodes.COMPLETE; }
      public boolean hasError()      { return status == StatusCodes.ERROR; }


      public ReportWorker( int id ) {
          this.id = id;
      }

      public String toString( ) {
         return "ReportWorker."+id+"("+status+")/"+rpt+"/"+data;
      }

      // the example code doesn't make clear if there is a relationship between rpt & data[i].
      public void initialize( String rpt, String data /* data[i] in original code */  ) {
         try {
            this.rpt = rpt;
            this.data = data;
            /* uncomment this part where you have the various classes availble.
             * I have it commented out for testing.
            cn = ds.getConnection();   
            Engine eng = new Engine(Engine.EXPORT_PDF);
            eng.setReportFile(rpt); //rpt is the report name
            eng.setConnection(cn);
            eng.setPrompt(data, 0);
            ReportProperties repprop = eng.getReportProperties();
            repprop.setPaperOrient(ReportProperties.DEFAULT_PAPER_ORIENTATION, ReportProperties.PAPER_FANFOLD_US);
            */
            status = StatusCodes.INITIALIZED;
            initialized = true; // want this true even if we're running.
         } catch( Exception e ) {
            status = StatusCodes.ERROR;
            throw new RuntimeException("initialze(rpt="+rpt+", data="+data+")", e);
         }
      }

      public void run() {
         status = StatusCodes.RUNNING;
         System.out.println("run().BEGIN: "+this);
         try {
            // delay for testing.
            try { Thread.sleep( test_delay ); }
            catch( InterruptedException e ) { System.out.println(this+".run(): test interrupted, ignoring "+e); }
            /* uncomment this part where you have the various classes availble.
             * I have it commented out for testing.
            eng.execute();
            PDFExportThread pdfExporter = new PDFExportThread(eng, sFileName, sFilePath);
            pdfExporter.execute();
            */
            status = StatusCodes.COMPLETE;
            System.out.println("run().END: "+this);
         } catch( Exception e ) {
            System.out.println("run().ERROR: "+this);
            status = StatusCodes.ERROR;
            throw new RuntimeException("run(rpt="+rpt+", data="+data+")", e);
         }
      }

      public void teardown() {
         if( ! isInitialized() || isRunning() ) {
            System.out.println("Warning: ReportWorker.teardown() called but I am uninitailzied or running.");
            // should never happen, fatal enough to throw an exception?
         }

         /* commented out for testing.
           try { cn.close(); } 
           catch( Exception e ) { System.out.println("Warning: ReportWorker.teardown() ignoring error on connection close: "+e); }
           cn = null;
         */
         // any need to close things on eng?
         // any need to close things on pdfExporter?
      }
}

这是一个非常基本的代码。池中具有固定大小线程的 ThreadPoolExecutor 是 backbone.

一些注意事项:

  1. 线程池大小应等于或小于数据库连接池大小。并且,它应该是对并行引擎合理的最佳数量。
  2. 主线程在杀死所有线程之前应该等待足够的时间。我把1小时作为等待时间,但这只是一个例子。
  3. 您需要进行适当的异常处理。
  4. 从 API 文档中,我看到了引擎 class 中的 stopAll 和关闭方法。所以,一旦我们的工作完成,我就会调用它。还是那句话,举个例子。

希望对您有所帮助。


import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RunEngine {
    public static void main(String[] args) throws Exception {
        final String rpt = "/tmp/rpt/input/rpt-1.rpt";
        final String sFilePath = "/tmp/rpt/output/";
        final String sFileName = "pdfreport";
        final Object[] data = new Object[10];

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
        for (int i = 0; i < data.length; i++) {
            PDFExporterRunnable runnable = new PDFExporterRunnable(rpt, data[i], sFilePath, sFileName, i);
            executor.execute(runnable);
        }
        executor.shutdown();
        executor.awaitTermination(1L, TimeUnit.HOURS);
        Engine.stopAll();
        Engine.shutdown();
    }
    private static class PDFExporterRunnable implements Runnable {
        private final String rpt;
        private final Object data;
        private final String sFilePath;
        private final String sFileName;
        private final int runIndex;


        public PDFExporterRunnable(String rpt, Object data, String sFilePath,
                String sFileName, int runIndex) {
            this.rpt = rpt;
            this.data = data;
            this.sFilePath = sFilePath;
            this.sFileName = sFileName;
            this.runIndex = runIndex;
        }

        @Override
        public void run() {
            // Loops
            Engine eng = new Engine(Engine.EXPORT_PDF);
            eng.setReportFile(rpt); // rpt is the report name
            Connection cn = null;

            /*
             * DB connection related code. Check and use.
             */
            //if (cn.isClosed() || cn == null) {
                //cn = ds.getConnection();
            //}
            eng.setConnection(cn);
            System.out.println(" After set connection");

            eng.setPrompt(data, 0);
            ReportProperties repprop = eng.getReportProperties();
            repprop.setPaperOrient(ReportProperties.DEFAULT_PAPER_ORIENTATION,
                    ReportProperties.PAPER_FANFOLD_US);
            eng.execute();
            System.out.println(" After excecute");
            FileOutputStream fos = null;
            try {
                String FileName = sFileName + "_" + runIndex;
                File file = new File(sFilePath + FileName + ".pdf");
                if (!file.getParentFile().exists()) {
                    file.getParentFile().mkdirs();
                }
                if (!file.exists()) {
                    file.createNewFile();
                }
                fos = new FileOutputStream(file);
                for (int k = 1; k <= eng.getPageCount(); k++) {
                    fos.write(eng.getPageData(k));
                }
                fos.flush();
                fos.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (fos != null) {
                    try {
                        fos.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    fos = null;
                }
            }
        }
    }
    /*
     * Dummy classes to avoid compilation errors.
     */
    private static class ReportProperties {
        public static final String PAPER_FANFOLD_US = null;
        public static final String DEFAULT_PAPER_ORIENTATION = null;
        public void setPaperOrient(String defaultPaperOrientation, String paperFanfoldUs) {
        }
    }

    private static class Engine {
        public static final int EXPORT_PDF = 1;
        public Engine(int exportType) {
        }
        public static void shutdown() {
        }
        public static void stopAll() {
        }
        public void setPrompt(Object singleData, int i) {
        }
        public byte[] getPageData(int k) {
            return null;
        }
        public int getPageCount() {
            return 0;
        }
        public void execute() {
        }
        public ReportProperties getReportProperties() {
            return null;
        }
        public void setConnection(Connection cn) {
        }
        public void setReportFile(String reportFile) {
        }
    }
}