Class 将数据写入 MySQL 数据库时在 map reduce 作业中抛出异常
Class cast exception in map reduce job when writing data to MySQL database
我正在尝试使用 map reduce 作业在 mysql 数据库中加载数据,但是我遇到了 class 转换异常错误,这是我使用的过程:
我首先创建了一个实现了 Writable 和 DBWritable 接口的 DBOutputWritable class。
然后我使用我的 reduce 作业将数据写入数据库,但是当我 运行 作业时,它失败了,说有一个错误:
java.lang.ClassCastException: com.amalwa.hadoop.DataBaseLoadMapReduce.DBOutputWritable cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:601)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.amalwa.hadoop.DataBaseLoadMapReduce.DBMapReduce$DBReducer.reduce(DBMapReduce.java:58)
at com.amalwa.hadoop.DataBaseLoadMapReduce.DBMapReduce$DBReducer.reduce(DBMapReduce.java:53)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:663)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:426)
at org.apache.hadoop.mapred.Child.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
我很难弄清楚,如果我的 class 实现了我们使用 map reduce 作业写入数据库所需的接口,那么为什么会出现 class 转换异常.我正在实现所有需要的功能。
谢谢。
DBOutputWritable
package com.amalwa.hadoop.DataBaseLoadMapReduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class DBOutputWritable implements Writable, DBWritable{
private String keyValue;
private String response;
public DBOutputWritable(String keyValue, String response){
this.keyValue = keyValue;
this.response = response;
}
public void readFields(DataInput resultSet) throws IOException {
}
public void readFields(ResultSet resultSet) throws SQLException {
keyValue = resultSet.getString(1);
response = resultSet.getString(2);
}
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, keyValue);
preparedStatement.setString(2, response);
}
public void write(DataOutput dataOutput) throws IOException {
}
}
减速器:
public static class DBReducer extends Reducer<Text, Text, DBOutputWritable, NullWritable>{
public void reduce(Text requestKey, Iterable<Text> response, Context context){
for(Text responseSet: response){
try{
context.write(new DBOutputWritable(requestKey.toString(), responseSet.toString()), NullWritable.get());
}catch(IOException e){
System.err.println(e.getMessage());
}
catch(InterruptedException e){
System.err.println(e.getMessage());
}
}
}
}
映射器:
public 静态 class DBMapper 扩展映射器{
public void map(LongWritable key, Text value, Context context) throws IOException{
String tweetInfo = value.toString();
String[] myTweetData = tweetInfo.split(",", 2);
String requestKey = myTweetData[0];
String response = myTweetData[1];
try {
context.write(new Text(requestKey), new Text(response));
} catch (InterruptedException e) {
System.err.println(e.getMessage());;
}
}
}
主要class:
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://ec2-54-152-254-194.compute-1.amazonaws.com/TWEETS", "user", "password");
Job job = new Job(conf);
job.setJarByClass(DBMapReduce.class);
job.setMapperClass(DBMapper.class);
job.setReducerClass(DBReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(DBOutputWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
DBOutputFormat.setOutput(job, "TWEET_INFO", new String[] { "REQUESTKEY", "TWEET_DETAILS" });
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
您似乎在混合使用旧的 (org.apache.hadoop.mapred.*
) 和新的 (org.apache.hadoop.mapreduce.*
) MapReduce APIs,这引起了冲突。我怀疑你的 DBReducer
class 正在从新的 API 扩展 Reducer
class 但你的 DBOutputWritable
正在实施 DBWritable
旧 API.
在您的实施过程中,您应该只选择其中一个 API,这意味着所有导入的 MapReduce 类型都以相同的包前缀开头。
请注意,通常您在使用旧 API 时实现 MapReduce 接口,并在使用新 API.
时扩展 MapReduce 基础 classes
我正在尝试使用 map reduce 作业在 mysql 数据库中加载数据,但是我遇到了 class 转换异常错误,这是我使用的过程:
我首先创建了一个实现了 Writable 和 DBWritable 接口的 DBOutputWritable class。 然后我使用我的 reduce 作业将数据写入数据库,但是当我 运行 作业时,它失败了,说有一个错误:
java.lang.ClassCastException: com.amalwa.hadoop.DataBaseLoadMapReduce.DBOutputWritable cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:601)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.amalwa.hadoop.DataBaseLoadMapReduce.DBMapReduce$DBReducer.reduce(DBMapReduce.java:58)
at com.amalwa.hadoop.DataBaseLoadMapReduce.DBMapReduce$DBReducer.reduce(DBMapReduce.java:53)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:663)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:426)
at org.apache.hadoop.mapred.Child.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
我很难弄清楚,如果我的 class 实现了我们使用 map reduce 作业写入数据库所需的接口,那么为什么会出现 class 转换异常.我正在实现所有需要的功能。
谢谢。
DBOutputWritable
package com.amalwa.hadoop.DataBaseLoadMapReduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class DBOutputWritable implements Writable, DBWritable{
private String keyValue;
private String response;
public DBOutputWritable(String keyValue, String response){
this.keyValue = keyValue;
this.response = response;
}
public void readFields(DataInput resultSet) throws IOException {
}
public void readFields(ResultSet resultSet) throws SQLException {
keyValue = resultSet.getString(1);
response = resultSet.getString(2);
}
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, keyValue);
preparedStatement.setString(2, response);
}
public void write(DataOutput dataOutput) throws IOException {
}
}
减速器:
public static class DBReducer extends Reducer<Text, Text, DBOutputWritable, NullWritable>{
public void reduce(Text requestKey, Iterable<Text> response, Context context){
for(Text responseSet: response){
try{
context.write(new DBOutputWritable(requestKey.toString(), responseSet.toString()), NullWritable.get());
}catch(IOException e){
System.err.println(e.getMessage());
}
catch(InterruptedException e){
System.err.println(e.getMessage());
}
}
}
}
映射器:
public 静态 class DBMapper 扩展映射器{
public void map(LongWritable key, Text value, Context context) throws IOException{
String tweetInfo = value.toString();
String[] myTweetData = tweetInfo.split(",", 2);
String requestKey = myTweetData[0];
String response = myTweetData[1];
try {
context.write(new Text(requestKey), new Text(response));
} catch (InterruptedException e) {
System.err.println(e.getMessage());;
}
}
}
主要class:
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://ec2-54-152-254-194.compute-1.amazonaws.com/TWEETS", "user", "password");
Job job = new Job(conf);
job.setJarByClass(DBMapReduce.class);
job.setMapperClass(DBMapper.class);
job.setReducerClass(DBReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(DBOutputWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
DBOutputFormat.setOutput(job, "TWEET_INFO", new String[] { "REQUESTKEY", "TWEET_DETAILS" });
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
您似乎在混合使用旧的 (org.apache.hadoop.mapred.*
) 和新的 (org.apache.hadoop.mapreduce.*
) MapReduce APIs,这引起了冲突。我怀疑你的 DBReducer
class 正在从新的 API 扩展 Reducer
class 但你的 DBOutputWritable
正在实施 DBWritable
旧 API.
在您的实施过程中,您应该只选择其中一个 API,这意味着所有导入的 MapReduce 类型都以相同的包前缀开头。
请注意,通常您在使用旧 API 时实现 MapReduce 接口,并在使用新 API.
时扩展 MapReduce 基础 classes