使用 jax-rs jetty 重新传输 sse
Retrasmit sse using jax-rs jetty
我尝试从 osgi 插件重新传输 sse。我有这段代码,目前我只能从事件源读取 sse 并在控制台中正确打印出来。 sse 写入部分工作不正常,而是缓冲。有什么方法可以修复它并从同一个函数读取和写入吗?
提前致谢!
@ApplicationPath("MySensor")
public class MySensor extends ResourceConfig {
private static String sensorA = "smotion";
// private static String sensorB = "sdist";
// private static String sensorC = "slight";
private int id = 0;
private String idn = "";
public MySensor() {
super(MySensor.class, SseFeature.class);
}
// creates new broadcaster
private static SseBroadcaster BROADCASTER = new SseBroadcaster();
@MethodDescription(value = "sse")
@GET
@Consumes(SseFeature.SERVER_SENT_EVENTS)
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents() {
id = id + 1;
idn = sensorA + " " + id;
BROADCASTER.broadcast(new OutboundEvent.Builder().data(String.class, idn).build());
// System.out.println(BROADCASTER.);
String LocalNetworkIP = "192.168.1.134";
EventOutput eventOutput = new EventOutput();
Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
WebTarget target = client.target("http://" + LocalNetworkIP + "/" + sensorA);
EventInput eventInput = target.request().get(EventInput.class);
while (!eventInput.isClosed()) {
InboundEvent inboundEvent = eventInput.read();
if (inboundEvent == null) {
break; // connection has been closed
}
try {
// handleevent
// inboundEvent.readData(String.class);
System.out.println(inboundEvent.readData(String.class));
OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
eventBuilder.name(inboundEvent.getName());
eventBuilder.data(inboundEvent.readData(String.class));
OutboundEvent event = eventBuilder.build();
eventOutput.write(event);
BROADCASTER.add(eventOutput);
} catch (IOException e) {
throw new RuntimeException("Error when writing the event.", e);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
try {
eventOutput.close();
} catch (IOException ioClose) {
throw new RuntimeException("Error when closing the event output.", ioClose);
}
return eventOutput;
}
终于找到了!
@MethodDescription(value = "Return the Server Sent Event")
@GET
@Consumes(SseFeature.SERVER_SENT_EVENTS)
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents() {
id = id + 1;
idn = sensorA + " " + id;
final EventOutput eventOutput = new EventOutput();
Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
WebTarget target = client.target("http://" + LocalNetworkIP + "/" + sensorA);
final EventInput eventInput = target.request().get(EventInput.class);
new Thread(new Runnable() {
@Override
public synchronized void run() {
try {
while (!eventInput.isClosed()) {
// Thread.sleep(500);
InboundEvent inboundEvent = eventInput.read();
if (inboundEvent == null) {
break; // connection has been closed
}
try {
// handleevent
// inboundEvent.readData(String.class);
System.out.println(inboundEvent.readData(String.class));
OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
eventBuilder.name(inboundEvent.getName());
eventBuilder.data(inboundEvent.readData(String.class));
OutboundEvent event = eventBuilder.build();
eventOutput.write(event);
} catch (IOException e) {
try { //extra
eventOutput.close(); //extra
eventInput.close(); //extra
} catch (IOException ioClose) { //extra
throw new RuntimeException("Error when closing the event output internal.", ioClose); //extra
} //extra
throw new RuntimeException("Error when writing or reading the event.", e);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (!eventOutput.isClosed()) { //extra
eventOutput.close(); //extra
} //extra
if (!eventInput.isClosed()) { //extra
eventInput.close();
} //extra
} catch (IOException ioClose) {
throw new RuntimeException("Error when closing the event output.", ioClose);
}
}
}
}).start();
return eventOutput;
}
}
我尝试从 osgi 插件重新传输 sse。我有这段代码,目前我只能从事件源读取 sse 并在控制台中正确打印出来。 sse 写入部分工作不正常,而是缓冲。有什么方法可以修复它并从同一个函数读取和写入吗? 提前致谢!
@ApplicationPath("MySensor")
public class MySensor extends ResourceConfig {
private static String sensorA = "smotion";
// private static String sensorB = "sdist";
// private static String sensorC = "slight";
private int id = 0;
private String idn = "";
public MySensor() {
super(MySensor.class, SseFeature.class);
}
// creates new broadcaster
private static SseBroadcaster BROADCASTER = new SseBroadcaster();
@MethodDescription(value = "sse")
@GET
@Consumes(SseFeature.SERVER_SENT_EVENTS)
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents() {
id = id + 1;
idn = sensorA + " " + id;
BROADCASTER.broadcast(new OutboundEvent.Builder().data(String.class, idn).build());
// System.out.println(BROADCASTER.);
String LocalNetworkIP = "192.168.1.134";
EventOutput eventOutput = new EventOutput();
Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
WebTarget target = client.target("http://" + LocalNetworkIP + "/" + sensorA);
EventInput eventInput = target.request().get(EventInput.class);
while (!eventInput.isClosed()) {
InboundEvent inboundEvent = eventInput.read();
if (inboundEvent == null) {
break; // connection has been closed
}
try {
// handleevent
// inboundEvent.readData(String.class);
System.out.println(inboundEvent.readData(String.class));
OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
eventBuilder.name(inboundEvent.getName());
eventBuilder.data(inboundEvent.readData(String.class));
OutboundEvent event = eventBuilder.build();
eventOutput.write(event);
BROADCASTER.add(eventOutput);
} catch (IOException e) {
throw new RuntimeException("Error when writing the event.", e);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
try {
eventOutput.close();
} catch (IOException ioClose) {
throw new RuntimeException("Error when closing the event output.", ioClose);
}
return eventOutput;
}
终于找到了!
@MethodDescription(value = "Return the Server Sent Event")
@GET
@Consumes(SseFeature.SERVER_SENT_EVENTS)
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents() {
id = id + 1;
idn = sensorA + " " + id;
final EventOutput eventOutput = new EventOutput();
Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
WebTarget target = client.target("http://" + LocalNetworkIP + "/" + sensorA);
final EventInput eventInput = target.request().get(EventInput.class);
new Thread(new Runnable() {
@Override
public synchronized void run() {
try {
while (!eventInput.isClosed()) {
// Thread.sleep(500);
InboundEvent inboundEvent = eventInput.read();
if (inboundEvent == null) {
break; // connection has been closed
}
try {
// handleevent
// inboundEvent.readData(String.class);
System.out.println(inboundEvent.readData(String.class));
OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
eventBuilder.name(inboundEvent.getName());
eventBuilder.data(inboundEvent.readData(String.class));
OutboundEvent event = eventBuilder.build();
eventOutput.write(event);
} catch (IOException e) {
try { //extra
eventOutput.close(); //extra
eventInput.close(); //extra
} catch (IOException ioClose) { //extra
throw new RuntimeException("Error when closing the event output internal.", ioClose); //extra
} //extra
throw new RuntimeException("Error when writing or reading the event.", e);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (!eventOutput.isClosed()) { //extra
eventOutput.close(); //extra
} //extra
if (!eventInput.isClosed()) { //extra
eventInput.close();
} //extra
} catch (IOException ioClose) {
throw new RuntimeException("Error when closing the event output.", ioClose);
}
}
}
}).start();
return eventOutput;
}
}