sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: wrong version number

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: wrong version number

我有一个基于 Flask 的后端应用程序,当 运行 在本地运行代码时,不会发生此错误,但在我的服务器中部署在 kubernetes pod 中时会发生此错误。

有趣的是,当您第一次 运行 代码时,它会立即失败(当 运行 调用 Future 时)但是当您再次 运行 时错误仍然发生,但代码继续(并成功完成)。很奇怪。

我得到的错误是:

2021-07-30 14:59:23,537 - mit_backend.logic.optimize - INFO - Inside first pass run code
2021-07-30 14:59:23,587 - mit_backend.logic.optimize - INFO - Optimising region ML10F   
2021-07-30 14:59:23,588 - mit_backend.logic.optimize - INFO - Optimising region ML47F   
2021-07-30 14:59:23,589 - mit_backend.logic.optimize - INFO - Optimising region ML40F   
--- Logging error ---
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: SSL error: wrong version number


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/code/mit_backend/logic/optimize.py", line 241, in optimize_first_pass_run
    config = get_csv_config(user_identity)
  File "/code/mit_backend/modules/v1/__init__.py", line 623, in get_csv_config
    res = CSVConfig.query.filter(CSVConfig.user_id == user_identity).first()
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3429, in first
    ret = list(self[0:1])
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__
    return list(res)
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: wrong version number

[SQL: SELECT csv_config.user_id AS csv_config_user_id, csv_config."csvHeaders" AS "csv_config_csvHeaders", csv_config."deliveryName" AS "csv_config_deliveryName", csv_config.id AS csv_config_id, csv_config.line1 AS csv_config_line1, csv_config.line2 AS csv_config_line2, csv_config.quantity AS csv_config_quantity, csv_config."routeNumber" AS "csv_config_routeNumber", csv_config.suburb AS csv_config_suburb, csv_config.weight AS csv_config_weight 
FROM csv_config 
WHERE csv_config.user_id = %(user_id_1)s 
 LIMIT %(param_1)s]
[parameters: {'user_id_1': 'sftp', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/code/mit_backend/logic/optimize.py", line 300, in first_pass_run
    future_df_result = future.result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: wrong version number

[SQL: SELECT csv_config.user_id AS csv_config_user_id, csv_config."csvHeaders" AS "csv_config_csvHeaders", csv_config."deliveryName" AS "csv_config_deliveryName", csv_config.id AS csv_config_id, csv_config.line1 AS csv_config_line1, csv_config.line2 AS csv_config_line2, csv_config.quantity AS csv_config_quantity, csv_config."routeNumber" AS "csv_config_routeNumber", csv_config.suburb AS csv_config_suburb, csv_config.weight AS csv_config_weight 
FROM csv_config 
WHERE csv_config.user_id = %(user_id_1)s 
 LIMIT %(param_1)s]
[parameters: {'user_id_1': 'sftp', 'param_1': 1}]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib64/python3.6/logging/__init__.py", line 994, in emit
    msg = self.format(record)
  File "/usr/lib64/python3.6/logging/__init__.py", line 840, in format
    return fmt.format(record)
  File "/usr/lib64/python3.6/logging/__init__.py", line 577, in format
    record.message = record.getMessage()
  File "/usr/lib64/python3.6/logging/__init__.py", line 338, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
  File "/usr/lib64/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib64/python3.6/socketserver.py", line 654, in process_request_thread
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 345, in handle
    BaseHTTPRequestHandler.handle(self)
  File "/usr/lib64/python3.6/http/server.py", line 418, in handle
    self.handle_one_request()
  File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 379, in handle_one_request
    return self.run_wsgi()
  File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 323, in run_wsgi
    execute(self.server.app)
  File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 312, in execute
    application_iter = app(environ, start_response)
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2464, in __call__
    return self.wsgi_app(environ, start_response)
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/usr/local/lib/python3.6/site-packages/flask_restx/api.py", line 375, in wrapper
    resp = resource(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask/views.py", line 89, in view
    return self.dispatch_request(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/flask_restx/resource.py", line 44, in dispatch_request
    resp = meth(*args, **kwargs)
  File "/code/mit_backend/modules/v1/controllers/manifest.py", line 161, in post
    files, df, sftp_date_latest = do_optimize(addresses, user_identity)
  File "/code/mit_backend/logic/optimize.py", line 38, in do_optimize
    df = first_pass_run(df, user_identity)
  File "/code/mit_backend/logic/optimize.py", line 307, in first_pass_run
    LOG.error("Exception is ", e.__cause__)
Message: 'Exception is '

Arguments: (_RemoteTraceback('\n"""\nTraceback (most recent call last):\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context\n    cursor, statement, parameters, context\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute\n    cursor.execute(statement, parameters)\npsycopg2.OperationalError: SSL error: decryption failed or bad record mac\n\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 175, in _process_worker\n    r = call_item.fn(*call_item.args, **call_item.kwargs)\n  File "/code/mit_backend/logic/optimize.py", line 241, in optimize_first_pass_run\n    config = get_csv_config(user_identity)\n  File "/code/mit_backend/modules/v1/__init__.py", line 623, in get_csv_config\n    res = CSVConfig.query.filter(CSVConfig.user_id == user_identity).first()\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3429, in first\n    ret = list(self[0:1])\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__\n    return list(res)\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__\n    return self._execute_and_instances(context)\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances\n    result = conn.execute(querycontext.statement, self._params)\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute\n    return meth(self, multiparams, params)\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection\n    return connection._execute_clauseelement(self, multiparams, params)\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement\n    distilled_params,\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context\n    e, statement, parameters, cursor, context\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception\n    sqlalchemy_exception, with_traceback=exc_info[2], from_=e\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_\n    raise exception\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context\n    cursor, statement, parameters, context\n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute\n    cursor.execute(statement, parameters)\nsqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac\n\n[SQL: SELECT csv_config.user_id AS csv_config_user_id, csv_config."csvHeaders" AS "csv_config_csvHeaders", csv_config."deliveryName" AS "csv_config_deliveryName", csv_config.id AS csv_config_id, csv_config.line1 AS csv_config_line1, csv_config.line2 AS csv_config_line2, csv_config.quantity AS csv_config_quantity, csv_config."routeNumber" AS "csv_config_routeNumber", csv_config.suburb AS csv_config_suburb, csv_config.weight AS csv_config_weight \nFROM csv_config \nWHERE csv_config.user_id = %(user_id_1)s \n LIMIT %(param_1)s]\n[parameters: {\'user_id_1\': \'sftp\', \'param_1\': 1}]\n(Background on this error at: http://sqlalche.me/e/13/e3q8)\n"""',),)

错误发生在 futures 中 ProcessPoolExecutor:

def first_pass_run(df, user_identity):
    LOG.info("Inside first pass run code")
    first_json = True
    df_final = None
    df_grouped = df.groupby('Route Number')
    auth_claim = get_authorization_claims_from_header()
    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        _futures = []
        for region_name, _df in df_grouped:
            _futures.append(
                executor.submit(
                    optimize_first_pass_run, region_name, _df, user_identity, auth_claim
                ))

        for future in concurrent.futures.as_completed(_futures):
            try:
                future_df_result = future.result()
                if first_json:
                    df_final = future_df_result
                    first_json = False
                else:
                    df_final = df_final.append(future_df_result, ignore_index=True)
            except Exception as e:
                LOG.error("Exception is ", e.__cause__)

我以悲观的方式启动我的 SQLAlchemy 引擎:

pg_db = SQLAlchemy(app)
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'], pool_pre_ping=True)

代码不是 运行宁在 WSGI 或 gunicorn 上,它很简单:

if __name__ == '__main__':
    LOG.info('running environment: %s', os.environ.get('ENV', 'production'))
    app.config['DEBUG'] = os.environ.get('ENV') == 'development'
    app.run(host='0.0.0.0', port=5001, debug=False, threaded=True)

有任何线索说明为什么会发生这种情况吗?谢谢。

我一直没能找到合适的修复方法,但我实现了以下内容,虽然仍然抛出异常,但代码重试并最终成功:

正在删除连接池

engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'], poolclass=NullPool)

事件侦听器

@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
    connection_record.info['pid'] = os.getpid()


@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
    pid = os.getpid()
    if connection_record.info['pid'] != pid:
        connection_record.connection = connection_proxy.connection = None
        raise exc.DisconnectionError(
            "Connection record belongs to pid %s, "
            "attempting to check out in pid %s" %
            (connection_record.info['pid'], pid)
        )

但是,根据文档,上面的两个定义与处理连接和使用 Pools

的乐观方式有关

初始化前重新连接Process Pool

engine.dispose()
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    _futures = []
    for region_name, _df in df_grouped:
        _futures.append(
            executor.submit(
                optimize_first_pass_run, region_name, _df, user_identity, auth_claim, True
            ))

    for future in concurrent.futures.as_completed(_futures):
        future_df_result = future.result()

尝试在数据库中插入对象时添加重试

def add_new_record(row):
    attempts = 0
    while attempts <= 5:
        attempts = attempts + 1
        try:
            pg_db.session.add(row)
            pg_db.session.commit()
            return True
        except (SQLAlchemyError, psycopg2.OperationalError, sqlalchemy.exc.OperationalError) as e:
            if attempts < 5:
                LOG.warn("Attempt failed. Trying rollback")
                pg_db.session.rollback()
                LOG.warn(e.__cause__)
                LOG.warn(e.__str__())
                time.sleep(5)
            else:
                LOG.error("Maximum number of retries reached. Raising an error")
                raise e
    return False