Pickle 对象(模型数据)运行 进入 SPARK 流中的内存问题
Pickle object(model data) running into memory issue in SPARK streaming
一直在尝试应用 pickled 模型来预测流数据。
最初该模型几乎是 1 GB,并认为减少可能会解决这个问题。使用不同的协议和压缩来 pickle 对象并将其减少到 60 MB。
输入数据流是一个 json 记录,预测应用于 3 个键。
Pickle 对象创建:
之前:
joblib.dump(pipeline, 'itemc_nb.pkl')
当前:
joblib.dump(pipeline, 'itemc_nb.pkl',compress=1,protocol=-1)
我测试的另一个理论是流式脚本所在的边缘节点上的内存消耗运行。如所见 here
满负荷时 运行 为 70%
边缘节点容量为 22 GB。
另一个想法是关于模型可能被调用多少次而不是垃圾收集。怎么可能解决只接一次?
model = joblib.load(os.path.join(__location__, 'itemc_nb.pkl'))
评估输入字符串的函数调用如下。这里可能存在效率低下的问题,这也可能导致此问题。
def predict_result(text):
ret_val = ''
try:
if text is not None and (type(text) == str or type(text) == unicode):
text = text.strip()
text = text.lower()
text = ''.join([i for i in text if not i.isdigit()])
text = ' '.join(text.split())
text = ' '.join([word for word in text.split() if word not in (stopwords.words('english'))])
text = text.split(' ', 0)
if re.match(r"^([a-z]|[0-9])\b", text[0]): #single letter removal
return 'non-relevant'
elif text[0] in ('n/a','na','.','nada','no','xx',''): #cleaning list
return 'non-relevant'
elif not text[0]:
return 'non-relevant'
else:
prediction = model.predict(text)
cat_name = cat_dict.get(prediction[0], 'No key found')
ret_val = cat_name
except (AttributeError, KeyError) as e:
ret_val = 'error'
return ret_val
此时正在寻找一些意见。
Exception encountered while processing data:
An error occurred while calling o394689.insertInto.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 366.0 failed 1 times, most recent failure: Lost task 0.0 in stage 366.0 (TID 366, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 442, in loads
return pickle.loads(obj)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 700, in subimport
__import__(name)
File "/tmp/spark-9e6c86f3-4d80-4bef-833e-e5a225d2824f/userFiles-1784ee88-ee98-467d-9abd-f017cccecf49/streaming_models.zip/itemc/itemc_tagger.py", line 14, in <module>
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 578, in load
obj = _unpickle(fobj, filename, mmap_mode)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 508, in _unpickle
obj = unpickler.load()
File "/opt/rh/python27/root/usr/lib64/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 341, in load_build
self.stack.append(array_wrapper.read(self))
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 184, in read
array = self.read_array(unpickler)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 130, in read_array
array = unpickler.np.empty(count, dtype=self.dtype)
MemoryError: (MemoryError(), <function subimport at 0x7f1d4f353050>, ('itemc.itemc_tagger',))
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:405)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:717)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:717)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:96)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile(InsertIntoHiveTable.scala:170)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile.apply(InsertIntoHiveTable.scala:150)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile.apply(InsertIntoHiveTable.scala:150)
这是一个内存错误,因为大型 pkl 文件试图与实时流数据交互。通过部署压缩模型解决了这个问题。
joblib.dump(pipeline, 'item.pkl',compress=1,protocol=-1)
模型大小从 1GB 变为 60MB。
一直在尝试应用 pickled 模型来预测流数据。 最初该模型几乎是 1 GB,并认为减少可能会解决这个问题。使用不同的协议和压缩来 pickle 对象并将其减少到 60 MB。
输入数据流是一个 json 记录,预测应用于 3 个键。
Pickle 对象创建:
之前:
joblib.dump(pipeline, 'itemc_nb.pkl')
当前:
joblib.dump(pipeline, 'itemc_nb.pkl',compress=1,protocol=-1)
我测试的另一个理论是流式脚本所在的边缘节点上的内存消耗运行。如所见 here
满负荷时 运行 为 70%边缘节点容量为 22 GB。
另一个想法是关于模型可能被调用多少次而不是垃圾收集。怎么可能解决只接一次?
model = joblib.load(os.path.join(__location__, 'itemc_nb.pkl'))
评估输入字符串的函数调用如下。这里可能存在效率低下的问题,这也可能导致此问题。
def predict_result(text):
ret_val = ''
try:
if text is not None and (type(text) == str or type(text) == unicode):
text = text.strip()
text = text.lower()
text = ''.join([i for i in text if not i.isdigit()])
text = ' '.join(text.split())
text = ' '.join([word for word in text.split() if word not in (stopwords.words('english'))])
text = text.split(' ', 0)
if re.match(r"^([a-z]|[0-9])\b", text[0]): #single letter removal
return 'non-relevant'
elif text[0] in ('n/a','na','.','nada','no','xx',''): #cleaning list
return 'non-relevant'
elif not text[0]:
return 'non-relevant'
else:
prediction = model.predict(text)
cat_name = cat_dict.get(prediction[0], 'No key found')
ret_val = cat_name
except (AttributeError, KeyError) as e:
ret_val = 'error'
return ret_val
此时正在寻找一些意见。
Exception encountered while processing data:
An error occurred while calling o394689.insertInto.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 366.0 failed 1 times, most recent failure: Lost task 0.0 in stage 366.0 (TID 366, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 442, in loads
return pickle.loads(obj)
File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 700, in subimport
__import__(name)
File "/tmp/spark-9e6c86f3-4d80-4bef-833e-e5a225d2824f/userFiles-1784ee88-ee98-467d-9abd-f017cccecf49/streaming_models.zip/itemc/itemc_tagger.py", line 14, in <module>
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 578, in load
obj = _unpickle(fobj, filename, mmap_mode)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 508, in _unpickle
obj = unpickler.load()
File "/opt/rh/python27/root/usr/lib64/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 341, in load_build
self.stack.append(array_wrapper.read(self))
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 184, in read
array = self.read_array(unpickler)
File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 130, in read_array
array = unpickler.np.empty(count, dtype=self.dtype)
MemoryError: (MemoryError(), <function subimport at 0x7f1d4f353050>, ('itemc.itemc_tagger',))
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:405)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute.apply(python.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:717)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$$anonfun$apply.apply(RDD.scala:717)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:96)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile(InsertIntoHiveTable.scala:170)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile.apply(InsertIntoHiveTable.scala:150)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile.apply(InsertIntoHiveTable.scala:150)
这是一个内存错误,因为大型 pkl 文件试图与实时流数据交互。通过部署压缩模型解决了这个问题。
joblib.dump(pipeline, 'item.pkl',compress=1,protocol=-1)
模型大小从 1GB 变为 60MB。