使用 jax-rs jetty 重新传输 sse

Retrasmit sse using jax-rs jetty

我尝试从 osgi 插件重新传输 sse。我有这段代码,目前我只能从事件源读取 sse 并在控制台中正确打印出来。 sse 写入部分工作不正常,而是缓冲。有什么方法可以修复它并从同一个函数读取和写入吗? 提前致谢!

@ApplicationPath("MySensor")
public class MySensor extends ResourceConfig {
    private static String sensorA = "smotion";
    // private static String sensorB = "sdist";
    // private static String sensorC = "slight";
    private int id = 0;
    private String idn = "";

public MySensor() {
    super(MySensor.class, SseFeature.class);
}

// creates new broadcaster
private static SseBroadcaster BROADCASTER = new SseBroadcaster();

@MethodDescription(value = "sse")
@GET
@Consumes(SseFeature.SERVER_SENT_EVENTS)
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents() {
    id = id + 1;
    idn = sensorA + " " + id;
    BROADCASTER.broadcast(new OutboundEvent.Builder().data(String.class, idn).build());

    // System.out.println(BROADCASTER.);

    String LocalNetworkIP = "192.168.1.134";

    EventOutput eventOutput = new EventOutput();
    Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
    WebTarget target = client.target("http://" + LocalNetworkIP + "/" + sensorA);
    EventInput eventInput = target.request().get(EventInput.class);

    while (!eventInput.isClosed()) {

        InboundEvent inboundEvent = eventInput.read();

        if (inboundEvent == null) {
            break; // connection has been closed
        }
        try {
            // handleevent
            // inboundEvent.readData(String.class);
            System.out.println(inboundEvent.readData(String.class));
            OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
            eventBuilder.name(inboundEvent.getName());
            eventBuilder.data(inboundEvent.readData(String.class));
            OutboundEvent event = eventBuilder.build();
            eventOutput.write(event);
            BROADCASTER.add(eventOutput);
        } catch (IOException e) {
            throw new RuntimeException("Error when writing the event.", e);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

        }
    }

    try {
        eventOutput.close();
    } catch (IOException ioClose) {
        throw new RuntimeException("Error when closing the event output.", ioClose);
    }

    return eventOutput;
}

终于找到了!

@MethodDescription(value = "Return the Server Sent Event")
@GET
@Consumes(SseFeature.SERVER_SENT_EVENTS)
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents() {
    id = id + 1;
    idn = sensorA + " " + id;

    final EventOutput eventOutput = new EventOutput();
    Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
    WebTarget target = client.target("http://" + LocalNetworkIP + "/" + sensorA);
    final EventInput eventInput = target.request().get(EventInput.class);

    new Thread(new Runnable() {
        @Override
        public synchronized void run() {
            try {
                while (!eventInput.isClosed()) {
                    // Thread.sleep(500);
                    InboundEvent inboundEvent = eventInput.read();

                    if (inboundEvent == null) {
                        break; // connection has been closed
                    }
                    try {
                        // handleevent
                        // inboundEvent.readData(String.class);
                        System.out.println(inboundEvent.readData(String.class));
                        OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
                        eventBuilder.name(inboundEvent.getName());
                        eventBuilder.data(inboundEvent.readData(String.class));
                        OutboundEvent event = eventBuilder.build();
                        eventOutput.write(event);
                    } catch (IOException e) {
                        try { //extra
                            eventOutput.close(); //extra
                            eventInput.close(); //extra
                        } catch (IOException ioClose) { //extra
                            throw new RuntimeException("Error when closing the event output internal.", ioClose); //extra
                        }    //extra  
                        throw new RuntimeException("Error when writing or reading the event.", e);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (!eventOutput.isClosed()) { //extra
                        eventOutput.close();       //extra
                    }                              //extra
                    if (!eventInput.isClosed()) {  //extra
                        eventInput.close();
                    }                              //extra
                } catch (IOException ioClose) {
                    throw new RuntimeException("Error when closing the event output.", ioClose);
                }
            }
        }
    }).start();
    return eventOutput;
}

}