如何通过多个 Java 线程使用只读借用的 Rust 数据?
How to use read-only borrowed Rust data by multiple Java threads?
我有一个结构 Foo
和 FooRef
引用了来自 Foo
:
的数据
struct Foo { /* ... */ }
struct FooRef<'foo> { /* ... */ }
impl Foo {
pub fn create_ref<'a>(&'a self) -> FooRef<'a> { /* ... */ }
}
现在Foo
逻辑中直接不能用了;我需要 FooRef
。创建 FooRef
需要大量计算,所以我在创建 Foo
实例后就做了一次。 FooRef
是不可变的;它仅用于读取数据。
多个线程需要访问此 FooRef
实例。我该如何实施?调用线程是 Java 个线程,这将与 JNI 一起使用。例如,这可以防止使用作用域线程池。
另一个复杂的问题是,当我必须刷新 Foo
实例以将新数据加载到其中时。然后我还需要重新创建 FooRef
实例。
如何实现线程安全和内存安全?我尝试弄乱指针和 RwLock
但这导致了内存泄漏(内存使用量在每次重新加载时不断增加)。我是一名 Java 开发人员,是指针的新手。
Foo
中的数据主要是文本,大约 250Mb。 FooRef
主要是 str
和从 Foo
借来的 str
的结构。
我的Java使用说明
我在 Java class 中使用两个 long
变量来存储指向 Foo
和 FooRef
的指针。我使用静态 ReentrantReadWriteLock
来保护这些指针。
如果 Foo
中的数据需要更新,我获取写锁,删除 FooRef
,更新 Foo
,创建一个新的 FooRef
并更新Java.
中的 ref 指针
如果我需要读取数据(即当我不更新 Foo
时),我会获取读取锁并使用 FooRef
.
内存泄漏仅在多个 Java 线程调用此代码时可见。
生锈:
use jni::objects::{JClass, JString};
use jni::sys::{jlong, jstring};
use jni::JNIEnv;
use std::collections::HashMap;
macro_rules! foo_mut_ptr {
($env: expr, $class: expr) => {
$env.get_field(*$class, "ptr", "J")
.ok()
.and_then(|j| j.j().ok())
.and_then(|ptr| {
if ptr == 0 {
None
} else {
Some(ptr as *mut Foo)
}
})
};
}
macro_rules! foo_ref_mut_ptr {
($env: expr, $class: expr) => {
$env.get_field(*$class, "ptrRef", "J")
.ok()
.and_then(|j| j.j().ok())
.and_then(|ptr| {
if ptr == 0 {
None
} else {
Some(ptr as *mut FooRef)
}
})
};
}
macro_rules! foo_mut {
($env: expr, $class: expr) => {
foo_mut_ptr!($env, $class).map(|ptr| &mut *ptr)
};
}
macro_rules! foo_ref {
($env: expr, $class: expr) => {
foo_ref_mut_ptr!($env, $class).map(|ptr| &*ptr)
};
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_create(_env: JNIEnv, _class: JClass) -> jlong {
Box::into_raw(Box::new(Foo::default())) as jlong
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_createRef(env: JNIEnv, class: JClass) -> jlong {
let foo = foo_mut!(env, class).expect("createRef was called on uninitialized Data");
let foo_ref = foo.create_ref();
Box::into_raw(Box::new(foo_ref)) as jlong
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_reload(env: JNIEnv, class: JClass) {
let foo = foo_mut!(env, class).expect("foo must be initialized");
*foo = Foo {
data: vec!["hello".to_owned(); 1024 * 1024],
};
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroy(env: JNIEnv, class: JClass) {
drop_ptr(foo_ref_mut_ptr!(env, class));
drop_ptr(foo_mut_ptr!(env, class));
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroyRef(env: JNIEnv, class: JClass) {
drop_ptr(foo_ref_mut_ptr!(env, class));
}
unsafe fn drop_ptr<T>(ptr: Option<*mut T>) {
if let Some(ptr) = ptr {
let _foo = Box::from_raw(ptr);
// foo drops here
}
}
#[derive(Default)]
struct Foo {
data: Vec<String>,
}
#[derive(Default)]
struct FooRef<'a> {
data: HashMap<&'a str, Vec<&'a str>>,
}
impl Foo {
fn create_ref(&self) -> FooRef {
let mut data = HashMap::new();
for s in &self.data {
let s = &s[..];
data.insert(s, vec![s]);
}
FooRef { data }
}
}
Java:
package test;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
public class App implements AutoCloseable {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadLock readLock = lock.readLock();
private final WriteLock writeLock = lock.writeLock();
private volatile long ptr;
private volatile long ptrRef;
private volatile boolean reload;
static {
System.loadLibrary("foo");
}
public static void main(String[] args) throws InterruptedException {
try (App app = new App()) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
while (true) {
app.tryReload();
}
}).start();
}
while (true) {
app.setReload();
}
}
}
public App() {
this.ptr = this.create();
}
public void setReload() {
writeLock.lock();
try {
reload = true;
} finally {
writeLock.unlock();
}
}
public void tryReload() {
readLock.lock();
debug("Got read lock");
if (reload) {
debug("Cache is expired");
readLock.unlock();
debug("Released read lock coz expired");
writeLock.lock();
debug("Got write lock");
try {
if (reload) {
fullReload();
}
readLock.lock();
debug("Got read lock inside write");
} finally {
writeLock.unlock();
debug("Released write lock");
}
}
readLock.unlock();
debug("Released read lock");
}
private void fullReload() {
destroyRef();
debug("Dropped ref");
debug("Reloading");
reload();
debug("Reloading completed");
updateRef();
debug("Created ref");
reload = false;
}
private void updateRef() {
this.ptrRef = this.createRef();
}
private native void reload();
private native long create();
private native long createRef();
private native void destroy();
private native void destroyRef();
@Override
public void close() {
writeLock.lock();
try {
this.destroy();
this.ptrRef = 0;
this.ptr = 0;
} finally {
writeLock.unlock();
}
}
private static void debug(String s) {
System.out.printf("%10s : %s%n", Thread.currentThread().getName(), s);
}
}
我认为是内存泄漏的问题实际上并不是内存泄漏。问题是分配器使用的是线程局部区域。因此,无论什么线程正在重新加载 250MB 的数据,都将分配的 space 保持原样,而不是将其返回给系统。这个问题不是 JNI 特有的,也发生在纯安全的 Rust 代码中。参见
在我的例子中,创建的默认竞技场数量默认为 8 * cpu count = 64。可以通过设置 MALLOC_ARENA_MAX
环境变量来覆盖此设置。
所以我通过将 MALLOC_ARENA_MAX
环境变量设置为 1 解决了这个问题。所以,我采取的方法很好。这只是特定于平台的问题。
此问题仅在 WSL 的 Ubuntu 中出现。我也在 Windows 10 上尝试了相同的代码,但没有任何调整,它运行完美,没有任何问题。
我有一个结构 Foo
和 FooRef
引用了来自 Foo
:
struct Foo { /* ... */ }
struct FooRef<'foo> { /* ... */ }
impl Foo {
pub fn create_ref<'a>(&'a self) -> FooRef<'a> { /* ... */ }
}
现在Foo
逻辑中直接不能用了;我需要 FooRef
。创建 FooRef
需要大量计算,所以我在创建 Foo
实例后就做了一次。 FooRef
是不可变的;它仅用于读取数据。
多个线程需要访问此 FooRef
实例。我该如何实施?调用线程是 Java 个线程,这将与 JNI 一起使用。例如,这可以防止使用作用域线程池。
另一个复杂的问题是,当我必须刷新 Foo
实例以将新数据加载到其中时。然后我还需要重新创建 FooRef
实例。
如何实现线程安全和内存安全?我尝试弄乱指针和 RwLock
但这导致了内存泄漏(内存使用量在每次重新加载时不断增加)。我是一名 Java 开发人员,是指针的新手。
Foo
中的数据主要是文本,大约 250Mb。 FooRef
主要是 str
和从 Foo
借来的 str
的结构。
我的Java使用说明
我在 Java class 中使用两个 long
变量来存储指向 Foo
和 FooRef
的指针。我使用静态 ReentrantReadWriteLock
来保护这些指针。
如果 Foo
中的数据需要更新,我获取写锁,删除 FooRef
,更新 Foo
,创建一个新的 FooRef
并更新Java.
如果我需要读取数据(即当我不更新 Foo
时),我会获取读取锁并使用 FooRef
.
内存泄漏仅在多个 Java 线程调用此代码时可见。
生锈:
use jni::objects::{JClass, JString};
use jni::sys::{jlong, jstring};
use jni::JNIEnv;
use std::collections::HashMap;
macro_rules! foo_mut_ptr {
($env: expr, $class: expr) => {
$env.get_field(*$class, "ptr", "J")
.ok()
.and_then(|j| j.j().ok())
.and_then(|ptr| {
if ptr == 0 {
None
} else {
Some(ptr as *mut Foo)
}
})
};
}
macro_rules! foo_ref_mut_ptr {
($env: expr, $class: expr) => {
$env.get_field(*$class, "ptrRef", "J")
.ok()
.and_then(|j| j.j().ok())
.and_then(|ptr| {
if ptr == 0 {
None
} else {
Some(ptr as *mut FooRef)
}
})
};
}
macro_rules! foo_mut {
($env: expr, $class: expr) => {
foo_mut_ptr!($env, $class).map(|ptr| &mut *ptr)
};
}
macro_rules! foo_ref {
($env: expr, $class: expr) => {
foo_ref_mut_ptr!($env, $class).map(|ptr| &*ptr)
};
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_create(_env: JNIEnv, _class: JClass) -> jlong {
Box::into_raw(Box::new(Foo::default())) as jlong
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_createRef(env: JNIEnv, class: JClass) -> jlong {
let foo = foo_mut!(env, class).expect("createRef was called on uninitialized Data");
let foo_ref = foo.create_ref();
Box::into_raw(Box::new(foo_ref)) as jlong
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_reload(env: JNIEnv, class: JClass) {
let foo = foo_mut!(env, class).expect("foo must be initialized");
*foo = Foo {
data: vec!["hello".to_owned(); 1024 * 1024],
};
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroy(env: JNIEnv, class: JClass) {
drop_ptr(foo_ref_mut_ptr!(env, class));
drop_ptr(foo_mut_ptr!(env, class));
}
#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroyRef(env: JNIEnv, class: JClass) {
drop_ptr(foo_ref_mut_ptr!(env, class));
}
unsafe fn drop_ptr<T>(ptr: Option<*mut T>) {
if let Some(ptr) = ptr {
let _foo = Box::from_raw(ptr);
// foo drops here
}
}
#[derive(Default)]
struct Foo {
data: Vec<String>,
}
#[derive(Default)]
struct FooRef<'a> {
data: HashMap<&'a str, Vec<&'a str>>,
}
impl Foo {
fn create_ref(&self) -> FooRef {
let mut data = HashMap::new();
for s in &self.data {
let s = &s[..];
data.insert(s, vec![s]);
}
FooRef { data }
}
}
Java:
package test;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
public class App implements AutoCloseable {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadLock readLock = lock.readLock();
private final WriteLock writeLock = lock.writeLock();
private volatile long ptr;
private volatile long ptrRef;
private volatile boolean reload;
static {
System.loadLibrary("foo");
}
public static void main(String[] args) throws InterruptedException {
try (App app = new App()) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
while (true) {
app.tryReload();
}
}).start();
}
while (true) {
app.setReload();
}
}
}
public App() {
this.ptr = this.create();
}
public void setReload() {
writeLock.lock();
try {
reload = true;
} finally {
writeLock.unlock();
}
}
public void tryReload() {
readLock.lock();
debug("Got read lock");
if (reload) {
debug("Cache is expired");
readLock.unlock();
debug("Released read lock coz expired");
writeLock.lock();
debug("Got write lock");
try {
if (reload) {
fullReload();
}
readLock.lock();
debug("Got read lock inside write");
} finally {
writeLock.unlock();
debug("Released write lock");
}
}
readLock.unlock();
debug("Released read lock");
}
private void fullReload() {
destroyRef();
debug("Dropped ref");
debug("Reloading");
reload();
debug("Reloading completed");
updateRef();
debug("Created ref");
reload = false;
}
private void updateRef() {
this.ptrRef = this.createRef();
}
private native void reload();
private native long create();
private native long createRef();
private native void destroy();
private native void destroyRef();
@Override
public void close() {
writeLock.lock();
try {
this.destroy();
this.ptrRef = 0;
this.ptr = 0;
} finally {
writeLock.unlock();
}
}
private static void debug(String s) {
System.out.printf("%10s : %s%n", Thread.currentThread().getName(), s);
}
}
我认为是内存泄漏的问题实际上并不是内存泄漏。问题是分配器使用的是线程局部区域。因此,无论什么线程正在重新加载 250MB 的数据,都将分配的 space 保持原样,而不是将其返回给系统。这个问题不是 JNI 特有的,也发生在纯安全的 Rust 代码中。参见
在我的例子中,创建的默认竞技场数量默认为 8 * cpu count = 64。可以通过设置 MALLOC_ARENA_MAX
环境变量来覆盖此设置。
所以我通过将 MALLOC_ARENA_MAX
环境变量设置为 1 解决了这个问题。所以,我采取的方法很好。这只是特定于平台的问题。
此问题仅在 WSL 的 Ubuntu 中出现。我也在 Windows 10 上尝试了相同的代码,但没有任何调整,它运行完美,没有任何问题。