Hadoop Cassandra 作业未读取所有输入行
Hadoop Cassandra job not reading all input rows
我有一个使用 Cassandra 作为输入和输出的非常简单的 Hadoop 作业。这是作业配置代码(没什么特别的):
Job job = new Job(getConf(), JOB_NAME);
job.setJarByClass(getClass());
job.setMapperClass(CassandraHadoopCounterMapper.class);
job.setReducerClass(CassandraHadoopCounterReducer.class);
job.setCombinerClass(CassandraHadoopCounterCombiner.class);
job.setInputFormatClass(CqlInputFormat.class);
job.setOutputFormatClass(CqlOutputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY, WIDE_ROWS);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET c = ?";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
//aditional properties:
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "2000");
ConfigHelper.setInputSplitSize(job.getConfiguration(), 4 * 64 * 1024);
我的输入 cassandra table 有 10k 行。
在 hadoop 中我设置了 max mappers = 2
和 max reducers = 2
在工作计数器中,我可以看到以下内容:
Map input records=4000
即InputCQLPageRowSize * mappers
如果未设置 InputCQLPageRowSize
,则 Map input records
等于 2000(因为默认 InputCQLPageRowSize
为 1000)
我的问题:
如何让我的 hadoop 作业读取输入中的所有行 table?
作业 运行 完全在我的 PC 上本地完成。
我正在使用 Cassandra v2.0.11 和 Hadoop v1.0.4
我的问题与 cassandra 2.0.11 中的一个错误有关,该错误在底层 CQL 查询中添加了一个奇怪的 LIMIT 子句运行将数据读取到地图任务:
我将那个问题发布到了 cassandra jira:
https://issues.apache.org/jira/browse/CASSANDRA-9074
事实证明,该问题与 cassandra 2.0.12 中修复的以下错误密切相关:
https://issues.apache.org/jira/browse/CASSANDRA-8166
我有一个使用 Cassandra 作为输入和输出的非常简单的 Hadoop 作业。这是作业配置代码(没什么特别的):
Job job = new Job(getConf(), JOB_NAME);
job.setJarByClass(getClass());
job.setMapperClass(CassandraHadoopCounterMapper.class);
job.setReducerClass(CassandraHadoopCounterReducer.class);
job.setCombinerClass(CassandraHadoopCounterCombiner.class);
job.setInputFormatClass(CqlInputFormat.class);
job.setOutputFormatClass(CqlOutputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(List.class);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY, WIDE_ROWS);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner.class.getName());
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET c = ?";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
//aditional properties:
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "2000");
ConfigHelper.setInputSplitSize(job.getConfiguration(), 4 * 64 * 1024);
我的输入 cassandra table 有 10k 行。
在 hadoop 中我设置了 max mappers = 2
和 max reducers = 2
在工作计数器中,我可以看到以下内容:
Map input records=4000
即InputCQLPageRowSize * mappers
如果未设置 InputCQLPageRowSize
,则 Map input records
等于 2000(因为默认 InputCQLPageRowSize
为 1000)
我的问题: 如何让我的 hadoop 作业读取输入中的所有行 table?
作业 运行 完全在我的 PC 上本地完成。
我正在使用 Cassandra v2.0.11 和 Hadoop v1.0.4
我的问题与 cassandra 2.0.11 中的一个错误有关,该错误在底层 CQL 查询中添加了一个奇怪的 LIMIT 子句运行将数据读取到地图任务:
我将那个问题发布到了 cassandra jira: https://issues.apache.org/jira/browse/CASSANDRA-9074
事实证明,该问题与 cassandra 2.0.12 中修复的以下错误密切相关: https://issues.apache.org/jira/browse/CASSANDRA-8166