Android 服务中的数据并发与传感器数据

Data concurrency in Android Service with sensor data

我的应用程序是 运行 一个 Service,它与多传感器腕带保持 BLE 连接。 Service 实现了腕带 SDK 的一些回调方法,每秒钟调用几次新数据。

我想将来自不同传感器的这些数据放在同一个 Observation 对象中,并与其时间戳相关。所有 Observation 个对象每 60 秒被推送到后端服务器,传感器数据被放在一起以减少发送这些 Observation 个对象的开销。

下面的代码片段介绍了我现在正在做的事情。我的问题是 observationFetcher 中的 while 循环完全阻止了应用程序。是否有任何其他方法可以在不使用块 while 循环的情况下同步这些传感器数据?

    observationFetcher = new Runnable() {
        @Override
        public void run() {
            while (isRecording) {
                if (lastMillis != currentMillis) {
                    Observation obs = sm.getValues();
                    obs.setPropertyAsString("gateway.id", UUID);
                    observations.add(obs);
                    lastMillis = currentMillis;
                }
            }
        }
    };

public void didReceiveGSR(float gsr, double timestamp) {
    long t = System.currentTimeMillis() / 1000;

    sm.setGsrValue(t, gsr);
    currentMillis = t;
}

public void didReceiveIBI(float ibi, double timestamp) {
    sm.setIbiValue(ibi);
}

sm 是一个具有 synchronized 方法的对象,用于将同一秒内的所有传感器数据放在一起。

如果我错了请纠正我,但我看不出有理由浪费 CPU 时间无限迭代。当然,我没有看到完整的代码,你的 API 可能不允许你做一些事情,但我会按以下方式实现数据处理:

final class Observation {
    private float gsr;
    private float ibi;

    public Observation(float gsr, float ibi) {
        this.gsr = gsr;
        this.ibi = ibi;
    }

    // getters & setters

}

public final class Observations {
    private final ConcurrentHashMap<Long, Observation> observations = new ConcurrentHashMap<>();

    public void insertGsrValue(long timestamp, float gsr) {
        for (;;) {
            Observation observation = observations.get(timestamp);
            if (observation == null) {
                observation = observations.putIfAbsent(timestamp, new Observation(gsr, 0.0f));
                if (observation == null) {
                    return;
                }
            }
            if (observations.replace(timestamp, observation, new Observation(gsr, observation.getIbi()))) {
                return;
            }
        }
    }

    public void insertIbiValue(long timestamp, float ibi) {
        for (;;) {
            Observation observation = observations.get(timestamp);
            if (observation == null) {
                observation = observations.putIfAbsent(timestamp, new Observation(0.0f, ibi));
                if (observation == null) {
                    return;
                }
            }
            if (observations.replace(timestamp, observation, new Observation(observation.getGsr(), ibi))) {
                return;
            }
        }
    }

    public List<Observation> getObservations() {
        return new ArrayList<>(observations.values());
    }

    public void clear() {
        observations.clear();
    }

}

public final class ObservationService extends Service {
    private final Observations observations = new Observations();
    private volatile long currentMillis;
    private HandlerThread handlerThread;
    private Handler handler;

    @Override
    public void onCreate() {
        super.onCreate();
        handlerThread = new HandlerThread("observations_sender_thread");
        handlerThread.start();
        handler = new Handler(handlerThread.getLooper());
        handler.postDelayed(new Runnable() {
            @Override
            public void run() {
                sendData();
                handler.postDelayed(this, TimeUnit.SECONDS.toMillis(60));
            }
        }, TimeUnit.SECONDS.toMillis(60));
    }

    @Override
    public void onDestroy() {
        handlerThread.quit();
    }

    private void sendData() {
        List<Observation> observationList = observations.getObservations();
        observations.clear();
        // send observation list somehow
    }

    public void didReceiveGSR(float gsr, double timestamp) {
        // assuming this is called on a worker thread
        long t = System.currentTimeMillis() / 1000;
        observations.insertGsrValue(t, gsr);
        currentMillis = t;
    }

    public void didReceiveIBI(float ibi, double timestamp) {
        // assuming this is called on a worker thread
        observations.insertIbiValue(currentMillis, ibi);
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }
}

所以这段代码所做的是将来自传感器的新值插入到哈希映射中,并每 60 秒将其发送到某个地方。这段代码仍然不完美,因为存在并发问题。例如,如果先有 2 个 gsr 值,然后有一个 ibi 值,那么我们将丢失第一个 gsr 值。

无论如何,这段代码应该让您了解如何避免阻塞线程并存储数据并发。

如果您对代码有任何疑问,请告诉我。