如何在 jgroups 中多播大文件

How to multicast large files in jgroups

假设我有一个相对较大的文件(大约 100MB),我想将其多播给集群的所有成员。如何使用 jgroups 以块的形式发送文件(最好有代码演示)?该文件应该在接收端以块的形式读取。另外,我如何确保在接收方维护块的顺序。

编辑 1 到目前为止,这是我尝试过的。我只是将文件作为一个整体发送,并将接收方的内容写入临时文件

    public class SimpleFileTransfer extends ReceiverAdapter {

    JChannel channel;

    private void start() throws Exception{
        channel = new JChannel();
        channel.setReceiver(this);
        channel.connect("FileCluster");
//        channel.getState(null, 10000);
        File file = new File("/res/test.txt"); //the file to be sent
        eventLoop(file);
        channel.close();
    }

    private void eventLoop(File file) throws IOException{
        BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
        try {
            Message msg = new Message(null, null, in);
            channel.send(msg);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }


    public void receive(Message msg)
    {
        try {
            File temp = new File("/res/temp.txt");
            FileWriter writer = new FileWriter(temp);
            InputStream in = new ByteArrayInputStream(msg.getBuffer());
            int next = in.read();
            while (next != -1){
                writer.write(next);
                next = in.read();
            }
        }
        catch (IOException ie)
        {
            ie.printStackTrace();
        }


    }

}

没有人会为您编写代码,但是:

  1. 将文件转为字节数组
  2. 将数组分成块
  3. 将每一块包裹在一个信封中,上面写着它是哪一块
  4. 发送块
  5. 读信封再把它们放回去

None这些东西单独来看都很难。

下面是错误的解决方案。要 运行 它,配置需要 bundler_type="sender-sends" (在 UDP 中)并且应用程序需要足够的内存。 这个解决方案不好,因为它将整个文件读入一个缓冲区,在 JGroups 中也被复制一次。 我将 post 提出的下一个解决方案更好,因为它将大文件分成多个较小的块。请注意,发送大文件时,JGroups 也会进行内部分块(碎片化),但您仍然必须在应用程序级别创建大 byte[] 缓冲区,这很糟糕。

public class SimpleFileTransfer extends ReceiverAdapter {
protected String   filename;
protected JChannel channel;

private void start(String name, String filename) throws Exception {
    this.filename=filename;
    channel=new JChannel("/home/bela/fast.xml").name(name);
    channel.setReceiver(this);
    channel.connect("FileCluster");
    eventLoop();
    channel.close();
}

private void eventLoop() throws Exception {
    while(true) {
        Util.keyPress(String.format("<enter to send %s>\n", filename));
        sendFile();
    }
}

protected void sendFile() throws Exception {
    Buffer buffer=readFile(filename);
    try {
        Message msg=new Message(null, buffer);
        channel.send(msg);
    }
    catch(Exception e) {
        e.printStackTrace();
    }
}


public void receive(Message msg) {
    System.out.printf("received %s from %s\n", Util.printBytes(msg.getLength()), msg.src());
    try {
        File temp=new File("/tmp/temp.txt");
        FileWriter writer=new FileWriter(temp);
        InputStream in=new ByteArrayInputStream(msg.getBuffer());
        int next=in.read();
        while(next != -1) {
            writer.write(next);
            next=in.read();
        }
    }
    catch(IOException ie) {
        ie.printStackTrace();
    }
}


protected static Buffer readFile(String filename) throws Exception {
    File file=new File(filename);
    int size=(int)file.length();
    FileInputStream input=new FileInputStream(file);
    ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(size);
    byte[] read_buf=new byte[1024];
    int bytes;
    while((bytes=input.read(read_buf)) != -1)
        out.write(read_buf, 0, bytes);
    return out.getBuffer();
}


public static void main(String[] args) throws Exception {
    if(args.length != 2) {
        System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName());
        return;
    }
    new SimpleFileTransfer().start(args[0], args[1]); // name and file
}

}

下面是更好的版本,它将大文件分成 8K 块。 文件 X 被写入 /tmp/X。注意 /home/bela/fast.xml 配置必须更改,当然:

public class SimpleFileTransfer extends ReceiverAdapter {
protected String   filename;
protected JChannel channel;
protected Map<String,OutputStream> files=new ConcurrentHashMap<>();
protected static final short ID=3500;

private void start(String name, String filename) throws Exception {
    ClassConfigurator.add((short)3500, FileHeader.class);
    this.filename=filename;
    channel=new JChannel("/home/bela/fast.xml").name(name);
    channel.setReceiver(this);
    channel.connect("FileCluster");
    eventLoop();
}

private void eventLoop() throws Exception {
    while(true) {
        Util.keyPress(String.format("<enter to send %s>\n", filename));
        sendFile();
    }
}

protected void sendFile() throws Exception {
    FileInputStream in=new FileInputStream(filename);
    try {
        for(;;) {
            byte[] buf=new byte[8096];
            int bytes=in.read(buf);
            if(bytes == -1)
                break;
            sendMessage(buf, 0, bytes, false);
        }
    }
    catch(Exception e) {
        e.printStackTrace();
    }
    finally {
        sendMessage(null, 0, 0, true);
    }
}


public void receive(Message msg) {
    byte[] buf=msg.getRawBuffer();
    FileHeader hdr=(FileHeader)msg.getHeader(ID);
    if(hdr == null)
        return;
    OutputStream out=files.get(hdr.filename);
    try {
        if(out == null) {
            File tmp=new File(hdr.filename);
            String fname=tmp.getName();
            fname="/tmp/" + fname;
            out=new FileOutputStream(fname);
            files.put(hdr.filename, out);
        }
        if(hdr.eof) {
            Util.close(files.remove(hdr.filename));
        }
        else {
            out.write(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
        }
    }
    catch(Throwable t) {
        System.err.println(t);
    }
}


protected void sendMessage(byte[] buf, int offset, int length, boolean eof) throws Exception {
    Message msg=new Message(null, buf, offset, length).putHeader(ID, new FileHeader(filename, eof));
    // set this if the sender doesn't want to receive the file
    // msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
    channel.send(msg);
}

protected static class FileHeader extends Header {
    protected String  filename;
    protected boolean eof;

    public FileHeader() {} // for de-serialization

    public FileHeader(String filename, boolean eof) {
        this.filename=filename;
        this.eof=eof;
    }

    public int size() {
        return Util.size(filename) + Global.BYTE_SIZE;
    }

    public void writeTo(DataOutput out) throws Exception {
        Util.writeObject(filename, out);
        out.writeBoolean(eof);
    }

    public void readFrom(DataInput in) throws Exception {
        filename=(String)Util.readObject(in);
        eof=in.readBoolean();
    }
}

public static void main(String[] args) throws Exception {
    if(args.length != 2) {
        System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName());
        return;
    }
    new SimpleFileTransfer().start(args[0], args[1]); // name and file
}

}