在 Java 中共享数据更新的进程间通信
Inter Process Communication to share updates to data in Java
我正在制作一个带有传感器的令牌环。我在 SensorClient
class 中定义了客户端进程。这些客户端对象从服务器进程接收有关其他客户端列表的信息,createSensor
class.
问题是我希望客户端在服务器上发生更改时更新它们拥有的信息。
服务器class:
public class createSensor {
private static createSensor instance = null;
private ArrayList<Sensor> sensor = new ArrayList<>();
public int position, prevPosition, nextPosition, prevPort, nextPort;
private createSensor() {
}
public synchronized ArrayList insertSensor(String type, String identificator, int port, String id, String gatwayAddr, long timestamp) throws IOException {
sensor.add(new Sensor(type, identificator, port, id, gatwayAddr, timestamp));
return new ArrayList<>(sensor); //
}
}
public synchronized boolean hasMeasurements() {
while (InnerBuffer.getInstance().emptyInnerBuffer())
return false;
return true;
}
public synchronized void setPrevNextWhenDelete(int position,ArrayList<Sensor> sensorList) throws IOException {
//code
}
public synchronized ArrayList<Sensor> getSensorList() {
return new ArrayList<>(sensor);
}
public synchronized int size() {
return sensor.size();
}
public synchronized String returnRecentMeasurement (String id){
String recentMeasurement=null;
for (Sensor sensori : sensor) {
if (sensori.getIdentificator().equalsIgnoreCase(id))
recentMeasurement= InnerBuffer.getInstance().returnRecentMeasurements(id);
else
recentMeasurement = null;}
return recentMeasurement;
}
public synchronized void setPrevNextWhenAdd() throws IOException { //some other code where int position, prevPosition, nextPosition, prevPort, nextPort get their values.
}}
客户端class:
public class SensorClient {
public static void main(String[] args) throws Exception {
//Starting a new sensor
Sensor sensor = new Sensor(type,identificator,portnumber,ipnumber,gatewayAddr,timestamp);
Gson gson = new Gson();
String message = gson.toJson(sensor);
Client c = Client.create();
WebResource r = c.resource("http://localhost:9999/gateway/");
ClientResponse response = r.path("sensors/add").type(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, message);
if (response.getStatus() == 200) {
repeat = false;
Type collectionType = new TypeToken<ArrayList<Sensor>>(){}.getType();
ArrayList<Sensor> sensorList = gson.fromJson(response.getEntity(String.class), collectionType);
System.out.println("Starting the sensor ...");
System.out.println("Push exit when you want to delete the sensor!");
int position = 0;
for(int i = 0; i< sensorList.size();i++){
if(sensorList.get(i).getIdentificator().equalsIgnoreCase(sensor.getIdentificator()) )
position = i;
}
sensors.Sensor.simulation(type, identificator);// special thread for sensors simulations
createSensor.getInstance().setPrevNextWhenAdd(position,sensorList);
serverSocket serverSocket = new serverSocket(portnumber,sensorList,position,sensorList.get(position).getNext());
serverSocket.start();
StopSensor stopSensor = new StopSensor(identificator,portnumber,position,sensorList);
stopSensor.start();
oneSensor s = new oneSensor(portnumber,sensorList);
s.start();
} else {
repeat = true;
count +=1;
System.out.println("Error. Wrong data! ");
}
}
while (repeat );
}
}
}
serverSocket线程
public class serverSocket extends Thread {
public int port,nextPort;
ArrayList<gateway.Sensor> sensorList;
public static int position;
public serverSocket(int port, ArrayList<gateway.Sensor> sensorList,int position,int nextPort) {
this.port = port;
this.nextPort=nextPort;
this.sensorList= sensorList;
this.position=position;}
public void run() {
ServerSocket welcomeSocket;
Socket connectionSocket;
try {
welcomeSocket = new ServerSocket(port);
while (true) {
connectionSocket = welcomeSocket.accept();
receivedMessages thread = new receivedMessages(connectionSocket,sensorList,position,nextPort);
thread.start();
}
} catch (IOException e) {
e.printStackTrace();
System.err.println("Error!!!!!!!!!");
}
}
}
receivedMessages 线程
public class receivedMessages extends Thread {
private BufferedReader inFromClient;
private Socket connectionSocket;
ArrayList<gateway.Sensor> sensorList;
int position,nextPort;
public receivedMessages(Socket socket, ArrayList<gateway.Sensor> sensorList,int position,int nextPort){
connectionSocket = socket;
this.sensorList=sensorList;
this.position=position;
this.nextPort=nextPort;
try {
inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()));
} catch (IOException e) { e.printStackTrace(); }
}
@Override
public void run() {
// while(true) {
try {
String message = (inFromClient.readLine().toString());
System.out.println(message);
if (message.startsWith("Next") || message.startsWith("Previous")) {
System.out.println(message);
} else if (message.startsWith("The")) {
System.out.println(message); createSensor.getInstance().setPrevNextWhenDelete(position, sensorList);
} else {// i receive the message that the list has changed
System.out.println(message);
sensorList = createSensor.getInstance().getSensorList();
System.out.println("Updated " + sensorList);}
监听变化
当数据被修改时,您的客户端进程有两种可能被通知:
您的客户端对象可以定期检查列表是否已被修改:
- 通过询问列表的当前版本
- 或通过请求列表修改的时间戳(您的服务器 class、
createSensor
必须保留该记录)
您的客户端对象可以打开一个端口来收听通知,并且您的服务器 class 可以通知它们更改,并在列表被修改时将新列表推送给它们。
在侦听器线程和工作线程之间共享更新
在这两种情况下,在您的客户端,您将有一个线程执行工作,另一个线程侦听更改。第二个线程每次都会重建一个新列表。为了与工作线程共享它,您可以执行以下操作:
- 您的侦听器线程将保留对
sensorList
的引用,该引用先前在创建时由工作线程传递
- 重建新列表时,您的侦听器线程会将新列表存储在另一个引用中,
newList
- 然后您的听众列表可以修改 "old" 列表,使其与新列表匹配,方法是先使用
oldList.clear()
然后 oldList.addAll(newList)
。
这会起作用,因为即使工作线程可能有自己的列表引用,两个引用都指向同一个对象,所以它会看到更改。
启动监听线程的代码
例如,如果你的工作线程是主线程,它正在创建监听线程,你的代码可能是这样的,在工作线程代码中:
class Listener implements Runnable {
List<Sensor> sensorList;
Listener(List<Sensor> list ){
sensorList = list;
}
public void run(){
// code to listen to changes on the server
// When server sends new information, thread can update the list directly:
// sensorList.clear();
// sensorList.addAll(newList);
}
}
Listener l = new Listener(sensorList);
Thread listenerThread = new Thread(l);
listenerThread.start();
您可以在 Java Tutorials on Concurrency.
中阅读有关线程以及它们如何共享内存的信息
不过要小心同步:当您更新旧列表时,您应该确保其他线程没有迭代它。您可能应该使用 Collections.synchronizedList(sensorList)
来确保没有线程干扰。
我正在制作一个带有传感器的令牌环。我在 SensorClient
class 中定义了客户端进程。这些客户端对象从服务器进程接收有关其他客户端列表的信息,createSensor
class.
问题是我希望客户端在服务器上发生更改时更新它们拥有的信息。
服务器class:
public class createSensor {
private static createSensor instance = null;
private ArrayList<Sensor> sensor = new ArrayList<>();
public int position, prevPosition, nextPosition, prevPort, nextPort;
private createSensor() {
}
public synchronized ArrayList insertSensor(String type, String identificator, int port, String id, String gatwayAddr, long timestamp) throws IOException {
sensor.add(new Sensor(type, identificator, port, id, gatwayAddr, timestamp));
return new ArrayList<>(sensor); //
}
}
public synchronized boolean hasMeasurements() {
while (InnerBuffer.getInstance().emptyInnerBuffer())
return false;
return true;
}
public synchronized void setPrevNextWhenDelete(int position,ArrayList<Sensor> sensorList) throws IOException {
//code
}
public synchronized ArrayList<Sensor> getSensorList() {
return new ArrayList<>(sensor);
}
public synchronized int size() {
return sensor.size();
}
public synchronized String returnRecentMeasurement (String id){
String recentMeasurement=null;
for (Sensor sensori : sensor) {
if (sensori.getIdentificator().equalsIgnoreCase(id))
recentMeasurement= InnerBuffer.getInstance().returnRecentMeasurements(id);
else
recentMeasurement = null;}
return recentMeasurement;
}
public synchronized void setPrevNextWhenAdd() throws IOException { //some other code where int position, prevPosition, nextPosition, prevPort, nextPort get their values.
}}
客户端class:
public class SensorClient {
public static void main(String[] args) throws Exception {
//Starting a new sensor
Sensor sensor = new Sensor(type,identificator,portnumber,ipnumber,gatewayAddr,timestamp);
Gson gson = new Gson();
String message = gson.toJson(sensor);
Client c = Client.create();
WebResource r = c.resource("http://localhost:9999/gateway/");
ClientResponse response = r.path("sensors/add").type(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, message);
if (response.getStatus() == 200) {
repeat = false;
Type collectionType = new TypeToken<ArrayList<Sensor>>(){}.getType();
ArrayList<Sensor> sensorList = gson.fromJson(response.getEntity(String.class), collectionType);
System.out.println("Starting the sensor ...");
System.out.println("Push exit when you want to delete the sensor!");
int position = 0;
for(int i = 0; i< sensorList.size();i++){
if(sensorList.get(i).getIdentificator().equalsIgnoreCase(sensor.getIdentificator()) )
position = i;
}
sensors.Sensor.simulation(type, identificator);// special thread for sensors simulations
createSensor.getInstance().setPrevNextWhenAdd(position,sensorList);
serverSocket serverSocket = new serverSocket(portnumber,sensorList,position,sensorList.get(position).getNext());
serverSocket.start();
StopSensor stopSensor = new StopSensor(identificator,portnumber,position,sensorList);
stopSensor.start();
oneSensor s = new oneSensor(portnumber,sensorList);
s.start();
} else {
repeat = true;
count +=1;
System.out.println("Error. Wrong data! ");
}
}
while (repeat );
}
}
}
serverSocket线程
public class serverSocket extends Thread {
public int port,nextPort;
ArrayList<gateway.Sensor> sensorList;
public static int position;
public serverSocket(int port, ArrayList<gateway.Sensor> sensorList,int position,int nextPort) {
this.port = port;
this.nextPort=nextPort;
this.sensorList= sensorList;
this.position=position;}
public void run() {
ServerSocket welcomeSocket;
Socket connectionSocket;
try {
welcomeSocket = new ServerSocket(port);
while (true) {
connectionSocket = welcomeSocket.accept();
receivedMessages thread = new receivedMessages(connectionSocket,sensorList,position,nextPort);
thread.start();
}
} catch (IOException e) {
e.printStackTrace();
System.err.println("Error!!!!!!!!!");
}
}
}
receivedMessages 线程
public class receivedMessages extends Thread {
private BufferedReader inFromClient;
private Socket connectionSocket;
ArrayList<gateway.Sensor> sensorList;
int position,nextPort;
public receivedMessages(Socket socket, ArrayList<gateway.Sensor> sensorList,int position,int nextPort){
connectionSocket = socket;
this.sensorList=sensorList;
this.position=position;
this.nextPort=nextPort;
try {
inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()));
} catch (IOException e) { e.printStackTrace(); }
}
@Override
public void run() {
// while(true) {
try {
String message = (inFromClient.readLine().toString());
System.out.println(message);
if (message.startsWith("Next") || message.startsWith("Previous")) {
System.out.println(message);
} else if (message.startsWith("The")) {
System.out.println(message); createSensor.getInstance().setPrevNextWhenDelete(position, sensorList);
} else {// i receive the message that the list has changed
System.out.println(message);
sensorList = createSensor.getInstance().getSensorList();
System.out.println("Updated " + sensorList);}
监听变化
当数据被修改时,您的客户端进程有两种可能被通知:
您的客户端对象可以定期检查列表是否已被修改:
- 通过询问列表的当前版本
- 或通过请求列表修改的时间戳(您的服务器 class、
createSensor
必须保留该记录)
您的客户端对象可以打开一个端口来收听通知,并且您的服务器 class 可以通知它们更改,并在列表被修改时将新列表推送给它们。
在侦听器线程和工作线程之间共享更新
在这两种情况下,在您的客户端,您将有一个线程执行工作,另一个线程侦听更改。第二个线程每次都会重建一个新列表。为了与工作线程共享它,您可以执行以下操作:
- 您的侦听器线程将保留对
sensorList
的引用,该引用先前在创建时由工作线程传递 - 重建新列表时,您的侦听器线程会将新列表存储在另一个引用中,
newList
- 然后您的听众列表可以修改 "old" 列表,使其与新列表匹配,方法是先使用
oldList.clear()
然后oldList.addAll(newList)
。
这会起作用,因为即使工作线程可能有自己的列表引用,两个引用都指向同一个对象,所以它会看到更改。
启动监听线程的代码
例如,如果你的工作线程是主线程,它正在创建监听线程,你的代码可能是这样的,在工作线程代码中:
class Listener implements Runnable {
List<Sensor> sensorList;
Listener(List<Sensor> list ){
sensorList = list;
}
public void run(){
// code to listen to changes on the server
// When server sends new information, thread can update the list directly:
// sensorList.clear();
// sensorList.addAll(newList);
}
}
Listener l = new Listener(sensorList);
Thread listenerThread = new Thread(l);
listenerThread.start();
您可以在 Java Tutorials on Concurrency.
中阅读有关线程以及它们如何共享内存的信息不过要小心同步:当您更新旧列表时,您应该确保其他线程没有迭代它。您可能应该使用 Collections.synchronizedList(sensorList)
来确保没有线程干扰。