如何以不同于其他列的方式映射静态列?
How to map static columns differently from other columns?
如何使用 datastax Java 对象映射将 the example given here 映射到下面的 class?
public class User {
private int user;
private int balance;
private List<Bill> bills;
}
public class Bill {
private String description;
private int amount;
}
关于 java-驱动程序中的映射模块,静态列不需要与非静态列区别对待。不过,您会担心的一个问题是,您需要某种一致性,因为只有当余额达到预期值时才会更新余额,因此仅使用 Mapper 的保存方法是不够的。相反,您将进行一批有条件的余额更新,并在同一批次中更新费用。
为了方便使用 Mapper,您可以使用 Accessor-annotated interface 来定义您的查询并将它们映射回您的对象。然后,您可以创建一个数据访问对象,用于使用映射器对象和其他一些方法与 Cassandra 交互。
这需要一些工作,但我认为它为您提供了一种很好的干净方法,可以将您的解决方案从 Cassandra 中抽象出来,同时仍然以惯用的方式使用它。另一种选择是查看 Achilles which is a more advanced object persistence manager for Cassandra. Kundera and Spring Data 其他可能的选项。
首先,让我们看看您的 classes 并将它们映射到博客示例中定义的 table:
CREATE TABLE bills (
user text,
balance int static,
expense_id int,
amount int,
description text,
paid boolean,
PRIMARY KEY (user, expense_id)
);
根据您的示例,我怀疑您可能希望使用用户定义的类型而不是帐单的单独列,但是由于您标记了此 post 'cassandra-2.0' 并且未引入 UDT在 2.1 之前,我不会介绍这个,但如果你想让我详细说明,我可以。
让我们定义我们的 class Bill
:
@Table(name="bills")
public class Bill {
@PartitionKey
private String user;
private int balance;
@ClusteringColumn
@Column(name="expense_id")
private int expenseId;
private int amount;
private String description;
private boolean paid;
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public int getBalance() {
return balance;
}
public void setBalance(int balance) {
this.balance = balance;
}
public int getExpenseId() {
return expenseId;
}
public void setExpenseId(int expenseId) {
this.expenseId = expenseId;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public boolean isPaid() {
return paid;
}
public void setPaid(boolean paid) {
this.paid = paid;
}
}
我们还定义一个 BillAccessor
用于与我们在 cassandra 中的账单交互,将它们映射回 Bill
对象。这应该涵盖博客 post:
中的所有查询
@Accessor
public interface BillAccessor {
@Query("INSERT INTO bills (user, balance) VALUES (?, ?) IF NOT EXISTS")
BoundStatement addUser(String user, int balance);
@Query("UPDATE bills SET balance = :newBalance WHERE user = :user IF balance = :currentBalance")
BoundStatement updateBalance(@Param("user") String user, @Param("currentBalance") int currentBalance,
@Param("newBalance") int newBalance);
@Query("SELECT balance from bills where user=?")
ResultSet getBalance(String user);
@Query("INSERT INTO bills (user, expense_id, amount, description, paid) values (?, ?, ?, ?, false) IF NOT EXISTS")
BoundStatement addBill(String user, int expenseId, int amount, String description);
@Query("UPDATE bills set paid=true where user=? and expense_id=? IF paid=false")
BoundStatement markBillPaid(String user, int expenseId);
@Query("SELECT * from bills where user=?")
Result<Bill> getBills(String user);
}
接下来我们将创建一个 DAO,使用 Bill
class 和 BillAccessor
:
来连接您的账单
public class BillDao {
private final Session session;
private final Mapper<Bill> mapper;
private final BillAccessor accessor;
public BillDao(Session session) {
this.session = session;
MappingManager manager = new MappingManager(session);
this.mapper = manager.mapper(Bill.class);
this.accessor = manager.createAccessor(BillAccessor.class);
}
public Integer getBalance(String user) {
ResultSet result = accessor.getBalance(user);
Row row = result.one();
if(row == null) {
return null;
} else {
return row.getInt(0);
}
}
public Iterable<Bill> getBills(String user) {
return accessor.getBills(user);
}
public Bill getBill(String user, int expenseId) {
return mapper.get(user, expenseId);
}
public int addBill(String user, int expenseId, int amount, String description) throws UpdateException {
BatchStatement batch = new BatchStatement();
Integer balance = getBalance(user);
if (balance == null) {
balance = 0;
// we need to create the user.
batch.add(accessor.addUser(user, balance - amount));
} else {
// we need to update the users balance.
batch.add(accessor.updateBalance(user, balance, balance - amount));
}
batch.add(accessor.addBill(user, expenseId, amount, description));
ResultSet result = session.execute(batch);
if (result.wasApplied()) {
return balance - amount;
} else {
throw new UpdateException("Failed applying bill, conditional update failed.");
}
}
public int payForBill(Bill bill) throws UpdateException {
Integer balance = getBalance(bill.getUser());
if(balance == null) {
throw new UpdateException("Failed paying for bill, user doesn't exist!");
}
BatchStatement batch = new BatchStatement();
batch.add(accessor.updateBalance(bill.getUser(), balance, bill.getAmount() + balance));
batch.add(accessor.markBillPaid(bill.getUser(), bill.getExpenseId()));
ResultSet result = session.execute(batch);
if(result.wasApplied()) {
return bill.getAmount() + balance;
} else {
throw new UpdateException("Failed paying for bill, conditional update failed.");
}
}
public class UpdateException extends Exception {
public UpdateException(String msg) {
super(msg);
}
}
}
请注意,我们通过检查 ResultSet.wasApplied() 来检查是否应用了更改。由于我们正在进行条件更新,因此如果我们的条件不成立,则可能不会应用更改。如果未应用更改,DAO 将简单地抛出一个 UpdateException
,但您可以选择不同的策略,例如在 DAO 中重试任意次数。
最后让我们写一些代码来练习 DAO:
Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
try {
Session session = cluster.connect("readtest");
BillDao billDao = new BillDao(session);
String user = "chandru";
// Create a bill, should exercise user create logic.
int balance = billDao.addBill(user, 1, 10, "Sandwich");
System.out.format("Bill %s/%d created, current balance is %d.%n", user, 1, balance);
// Create another bill, should exercise balance update logic.
balance = billDao.addBill(user, 2, 6, "Salad");
System.out.format("Bill %s/%d created, current balance is %d.%n", user, 2, balance);
// Pay for all the bills!
for(Bill bill : billDao.getBills(user)) {
balance = billDao.payForBill(bill);
System.out.format("Paid for %s/%d, current balance is %d.%n", user, bill.getExpenseId(), balance);
// Ensure bill was paid.
Bill newBill = billDao.getBill(user, bill.getExpenseId());
System.out.format("Is %s/%d paid for?: %b.%n", user, newBill.getExpenseId(), newBill.isPaid());
}
// Try to add another bill with an already used expense id.
try {
billDao.addBill(user, 1, 1, "Diet Coke");
} catch(BillDao.UpdateException ex) {
System.err.format("Could not add bill %s/%d: %s", user, 1, ex.getMessage());
}
} finally {
cluster.close();
}
如果一切顺利,您应该观察到以下输出:
Bill chandru/1 created, current balance is -10.
Bill chandru/2 created, current balance is -16.
Paid for chandru/1, current balance is -6.
Is chandru/1 paid for?: true.
Paid for chandru/2, current balance is 0.
Is chandru/2 paid for?: true.
Could not add bill chandru/1: Failed applying bill, conditional update failed.
你的 table 状态将是:
cqlsh:readtest> select * from bills;
user | expense_id | balance | amount | description | paid
---------+------------+---------+--------+-------------+------
chandru | 1 | 0 | 10 | Sandwich | True
chandru | 2 | 0 | 6 | Salad | True
如何使用 datastax Java 对象映射将 the example given here 映射到下面的 class?
public class User {
private int user;
private int balance;
private List<Bill> bills;
}
public class Bill {
private String description;
private int amount;
}
关于 java-驱动程序中的映射模块,静态列不需要与非静态列区别对待。不过,您会担心的一个问题是,您需要某种一致性,因为只有当余额达到预期值时才会更新余额,因此仅使用 Mapper 的保存方法是不够的。相反,您将进行一批有条件的余额更新,并在同一批次中更新费用。
为了方便使用 Mapper,您可以使用 Accessor-annotated interface 来定义您的查询并将它们映射回您的对象。然后,您可以创建一个数据访问对象,用于使用映射器对象和其他一些方法与 Cassandra 交互。
这需要一些工作,但我认为它为您提供了一种很好的干净方法,可以将您的解决方案从 Cassandra 中抽象出来,同时仍然以惯用的方式使用它。另一种选择是查看 Achilles which is a more advanced object persistence manager for Cassandra. Kundera and Spring Data 其他可能的选项。
首先,让我们看看您的 classes 并将它们映射到博客示例中定义的 table:
CREATE TABLE bills (
user text,
balance int static,
expense_id int,
amount int,
description text,
paid boolean,
PRIMARY KEY (user, expense_id)
);
根据您的示例,我怀疑您可能希望使用用户定义的类型而不是帐单的单独列,但是由于您标记了此 post 'cassandra-2.0' 并且未引入 UDT在 2.1 之前,我不会介绍这个,但如果你想让我详细说明,我可以。
让我们定义我们的 class Bill
:
@Table(name="bills")
public class Bill {
@PartitionKey
private String user;
private int balance;
@ClusteringColumn
@Column(name="expense_id")
private int expenseId;
private int amount;
private String description;
private boolean paid;
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public int getBalance() {
return balance;
}
public void setBalance(int balance) {
this.balance = balance;
}
public int getExpenseId() {
return expenseId;
}
public void setExpenseId(int expenseId) {
this.expenseId = expenseId;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public boolean isPaid() {
return paid;
}
public void setPaid(boolean paid) {
this.paid = paid;
}
}
我们还定义一个 BillAccessor
用于与我们在 cassandra 中的账单交互,将它们映射回 Bill
对象。这应该涵盖博客 post:
@Accessor
public interface BillAccessor {
@Query("INSERT INTO bills (user, balance) VALUES (?, ?) IF NOT EXISTS")
BoundStatement addUser(String user, int balance);
@Query("UPDATE bills SET balance = :newBalance WHERE user = :user IF balance = :currentBalance")
BoundStatement updateBalance(@Param("user") String user, @Param("currentBalance") int currentBalance,
@Param("newBalance") int newBalance);
@Query("SELECT balance from bills where user=?")
ResultSet getBalance(String user);
@Query("INSERT INTO bills (user, expense_id, amount, description, paid) values (?, ?, ?, ?, false) IF NOT EXISTS")
BoundStatement addBill(String user, int expenseId, int amount, String description);
@Query("UPDATE bills set paid=true where user=? and expense_id=? IF paid=false")
BoundStatement markBillPaid(String user, int expenseId);
@Query("SELECT * from bills where user=?")
Result<Bill> getBills(String user);
}
接下来我们将创建一个 DAO,使用 Bill
class 和 BillAccessor
:
public class BillDao {
private final Session session;
private final Mapper<Bill> mapper;
private final BillAccessor accessor;
public BillDao(Session session) {
this.session = session;
MappingManager manager = new MappingManager(session);
this.mapper = manager.mapper(Bill.class);
this.accessor = manager.createAccessor(BillAccessor.class);
}
public Integer getBalance(String user) {
ResultSet result = accessor.getBalance(user);
Row row = result.one();
if(row == null) {
return null;
} else {
return row.getInt(0);
}
}
public Iterable<Bill> getBills(String user) {
return accessor.getBills(user);
}
public Bill getBill(String user, int expenseId) {
return mapper.get(user, expenseId);
}
public int addBill(String user, int expenseId, int amount, String description) throws UpdateException {
BatchStatement batch = new BatchStatement();
Integer balance = getBalance(user);
if (balance == null) {
balance = 0;
// we need to create the user.
batch.add(accessor.addUser(user, balance - amount));
} else {
// we need to update the users balance.
batch.add(accessor.updateBalance(user, balance, balance - amount));
}
batch.add(accessor.addBill(user, expenseId, amount, description));
ResultSet result = session.execute(batch);
if (result.wasApplied()) {
return balance - amount;
} else {
throw new UpdateException("Failed applying bill, conditional update failed.");
}
}
public int payForBill(Bill bill) throws UpdateException {
Integer balance = getBalance(bill.getUser());
if(balance == null) {
throw new UpdateException("Failed paying for bill, user doesn't exist!");
}
BatchStatement batch = new BatchStatement();
batch.add(accessor.updateBalance(bill.getUser(), balance, bill.getAmount() + balance));
batch.add(accessor.markBillPaid(bill.getUser(), bill.getExpenseId()));
ResultSet result = session.execute(batch);
if(result.wasApplied()) {
return bill.getAmount() + balance;
} else {
throw new UpdateException("Failed paying for bill, conditional update failed.");
}
}
public class UpdateException extends Exception {
public UpdateException(String msg) {
super(msg);
}
}
}
请注意,我们通过检查 ResultSet.wasApplied() 来检查是否应用了更改。由于我们正在进行条件更新,因此如果我们的条件不成立,则可能不会应用更改。如果未应用更改,DAO 将简单地抛出一个 UpdateException
,但您可以选择不同的策略,例如在 DAO 中重试任意次数。
最后让我们写一些代码来练习 DAO:
Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
try {
Session session = cluster.connect("readtest");
BillDao billDao = new BillDao(session);
String user = "chandru";
// Create a bill, should exercise user create logic.
int balance = billDao.addBill(user, 1, 10, "Sandwich");
System.out.format("Bill %s/%d created, current balance is %d.%n", user, 1, balance);
// Create another bill, should exercise balance update logic.
balance = billDao.addBill(user, 2, 6, "Salad");
System.out.format("Bill %s/%d created, current balance is %d.%n", user, 2, balance);
// Pay for all the bills!
for(Bill bill : billDao.getBills(user)) {
balance = billDao.payForBill(bill);
System.out.format("Paid for %s/%d, current balance is %d.%n", user, bill.getExpenseId(), balance);
// Ensure bill was paid.
Bill newBill = billDao.getBill(user, bill.getExpenseId());
System.out.format("Is %s/%d paid for?: %b.%n", user, newBill.getExpenseId(), newBill.isPaid());
}
// Try to add another bill with an already used expense id.
try {
billDao.addBill(user, 1, 1, "Diet Coke");
} catch(BillDao.UpdateException ex) {
System.err.format("Could not add bill %s/%d: %s", user, 1, ex.getMessage());
}
} finally {
cluster.close();
}
如果一切顺利,您应该观察到以下输出:
Bill chandru/1 created, current balance is -10.
Bill chandru/2 created, current balance is -16.
Paid for chandru/1, current balance is -6.
Is chandru/1 paid for?: true.
Paid for chandru/2, current balance is 0.
Is chandru/2 paid for?: true.
Could not add bill chandru/1: Failed applying bill, conditional update failed.
你的 table 状态将是:
cqlsh:readtest> select * from bills;
user | expense_id | balance | amount | description | paid
---------+------------+---------+--------+-------------+------
chandru | 1 | 0 | 10 | Sandwich | True
chandru | 2 | 0 | 6 | Salad | True