Python SQLAlchemy Flask Sqlite 只能从 "python crud.py" 打开数据库。当应用程序为 运行 时导入时,它无法打开数据库
Python SQLAlchemy Flask Sqlite can open database only from "python crud.py". When imported while the app is running, it cant open the database
app.py
from pprint import pprint
from flask import Flask, render_template
from flask_restful import Api, Resource
from restApi import crud
app = Flask(__name__)
api = Api(app)
@app.route('/aad')
def hello_world():
return "hello world"
@app.route('/')
def hello_worldp():
return render_template("index.html")
class HelloWorld(Resource):
def get(self):
return {"prova": "sono una prova"}
api.add_resource(HelloWorld, "/HelloWorld")
api.add_resource(crud.ProvaApi, "/c")
api.add_resource(crud.getAllColumns, "/allColumns")
if __name__ == '__main__':
app.run(debug=True)
test.py
from pprint import pprint
from sqlalchemy.orm import joinedload
from flask import jsonify
from data import schema
def test1():
c = schema.Colonne(titolo="prova5", stato="boh")
cont1 = schema.Contenuto(testo="prova Contenuto1")
cont2 = schema.Contenuto(url="nonsourl")
t = schema.Tile(titolo="provaTile", autore="nonso", contenuto=cont1.id)
t2 = schema.Tile(titolo="provaTile", autore="nonso", contenuto=cont2.id)
session = schema.dbSession()
# c.tile.append(t)
#c.tile.append(t2)
#c.tile[0].contenuto.add(cont1)
#c.tile[0].contenuto.add(cont2)
# session.add(c)
#session.commit()
colschema = schema.ColonneSchema()
tileschema = schema.TileSchema()
contschema = schema.ContenutoSchema()
qr = session.query(schema.Colonne, schema.Tile, schema.Contenuto).\
select_from(schema.Colonne).\
join(schema.Tile, schema.Colonne.titolo == schema.Tile.colonna_id, isouter=True).\
join(schema.Contenuto, schema.Tile.contenuto == schema.Contenuto.id, isouter=True).all()
#qr = qr.filter(schema.Colonne.titolo == "prova5")
#record = qr.one()
print(qr)
#ti = record.tile.get(0).filter(schema.Tile.id == 1)
#ti.contenuto.add(cont1)
qp = session.query(schema.Tile);
qp = qp.filter(schema.Tile.id == "1")
record = qp.one()
record.contenuto = cont1.id
session.commit()
qr = session.query(schema.Colonne, schema.Tile, schema.Contenuto). \
select_from(schema.Colonne). \
join(schema.Tile, schema.Colonne.titolo == schema.Tile.colonna_id, isouter=True). \
join(schema.Contenuto, schema.Tile.contenuto == schema.Contenuto.id, isouter=True).all()
for r1, r2, r3 in qr:
print(colschema.dump(r1))
print(tileschema.dump(r2))
print(contschema.dump(r3))
def test2():
c = schema.Colonne(titolo="prova4", stato="boh")
t = schema.Tile(titolo="provaTile", autore="nonso")
t2 = schema.Tile(titolo="provaTile", autore="nonso")
cont = schema.Contenuto(testo="prova Contenuto", url ="nonsourl")
session = schema.dbSession()
c.tiles.append(t)
c.tiles[0].contenuto = cont
dum = schema.ColonneSchema().dump(c)
#print(c.tiles[0].contenuto)
#print(dum)
#c.tiles.append(t2)
#c.tiles[0].append(cont)
session.add(c)
session.commit()
#tiles
#cols = session.query(schema.Tile).all()
# print(jsonify(cols))
#t_schema = schema.TileSchema()
# print(c_schema.dump(cols))
#for col in cols:
# print(jsonify(col))
# print(t_schema.dump(col))
def test3():
session = schema.dbSession()
query = "select Colonne.titolo, Colonne.stato, Tile.id, Tile.autore, Tile.contenuto, Contenuto.testo,Contenuto.url" \
" from Colonne inner join Tile" \
" on Colonne.titolo = Tile.colonna_id left join " \
"Contenuto on Contenuto.id = Tile.contenuto;"
rs = session.execute(query)
#row_headers = [x[0] for x in rs.description]
resultset = [dict(row) for row in rs]
print(resultset)
#for row in rs:
# print(jsonify(row))
def test4():
query="insert into Contenuto values ('gna', 'codio')"
session = schema.dbSession()
rs = session.execute(query)
def test5():
query = "SELECT * FROM Contenuto"
session = schema.dbSession()
rs = session.execute(query)
presultset = [dict(row) for row in rs]
print(presultset)
def test6():
session = schema.dbSession()
rs = session.execute("PRAGMA foreign_keys;")
for row in rs:
print(row)
def test7():
session = schema.dbSession()
rs1 = session.query(schema.Colonne).select_from(schema.Colonne).join(schema.Tile).join(schema.Contenuto).all()
rs = session.query(schema.Colonne).all()
scCol = schema.ColonneSchema(many=True)
print(scCol.dump(rs))
#for row in rs:
# print(scCol.dump(rs))
def test8():
c = schema.Colonne(titolo="prova4", stato="boh")
t = schema.Tile(titolo="provaTile", autore="nonso", colonna = c)
t.colonna = c
cont = schema.Contenuto(testo="prova Contenuto", url="nonsourl")
t.contenuto = cont
rs = schema.ContenutoSchema().dump(cont)
pprint(rs)
rs = schema.ColonneSchema().dump(c)
pprint(rs)
def test9():
session = schema.dbSession()
rs = session.query(schema.Colonne).all()
pprint(schema.ColonneSchema(many=True).dump(rs))
def test10():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
pprint(rs)
listaTot= []
listaColonne = []
for col in colonne:
dizionario = {}
print("***************COlonna********************")
print(schema.ColonneSchema().dump(col))
listaTile = []
dizionario['colonne'] = schema.ColonneSchema().dump(col)
for tile in col.tile:
print("***************Tile********************")
#pprint(tileSchema.dump(tile))
print("***************Contenuto********************")
#pprint(contenutoSchema.dump(tile.contenuto))
print("***************Contenuto********************")
print("***************Tile********************")
tiletmp= tileSchema.dump(tile)
tiletmp['contenuto'] = contenutoSchema.dump(tile.contenuto)
listaTile.append(tiletmp)
dizionario['tile'] = listaTile
print("***************COlonna********************")
listaTot.append(dizionario)
return listaTot
def test11():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
pprint(rs)
listaColonne= []
for col in colonne:
print(schema.ColonneSchema().dump(col))
def test12():
sess = schema.dbSession()
r=sess.query(schema.Colonne).filter_by(titolo="prova4").first()
sess.delete(r)
sess.commit()
print(r)
def test13():#update
sess = schema.dbSession()
r=sess.query(schema.Colonne).filter_by(titolo="prova4").first()
r.titolo ="provagay"
sess.add(r)
sess.commit()
print(r)
def test14():#cancella elemento
sess = schema.dbSession()
r=sess.query(schema.Tile).filter_by(id="1").first()
sess.delete(r)
sess.commit()
print(r)
def test15():#update elemento
session = schema.dbSession()
c=session.query(schema.Colonne).filter_by(titolo="prova4").first()
t = schema.Tile(titolo="provaTile", autore="nonso")
t2 = schema.Tile(titolo="provaTile", autore="nonso")
cont = schema.Contenuto(testo="prova Contenuto", url ="nonsourl")
c.tiles.append(t)
c.tiles[0].contenuto = cont
dum = schema.ColonneSchema().dump(c)
#print(c.tiles[0].contenuto)
#print(dum)
#c.tiles.append(t2)
#c.tiles[0].append(cont)
session.add(c)
session.commit()
def query_to_dict(ret):
if ret is not None:
return [{key: value for key, value in row.items()} for row in ret if row is not None]
else:
return [{}]
if __name__ == "__main__":
pprint(test10())
schema.py
from flask import Flask
from marshmallow_sqlalchemy.fields import Nested
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey
from sqlalchemy.orm import relationship, scoped_session, sessionmaker, joinedload,backref
from sqlalchemy.ext.declarative import declarative_base
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema, fields
from flask_marshmallow import Marshmallow
app = Flask(__name__)
Base = declarative_base()
engine = create_engine('sqlite:///../database/mydb.db', convert_unicode=True)
dbSession = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine,
))
ma = Marshmallow(app)
class Colonne(Base):
__tablename__ = "Colonne"
id = Column(Integer,primary_key=True,autoincrement=True)
titolo = Column(String, unique=True)
stato = Column(String)
tile = relationship("Tile", back_populates ="colonna", cascade="all,delete")
class Tile(Base):
__tablename__ = "Tile"
id = Column(Integer, primary_key=True, autoincrement=True)
autore = Column(String)
titolo = Column(String)
colonna_id = Column(Integer, ForeignKey("Colonne.id"))
colonna = relationship("Colonne", backref="tiles", overlaps="tile")
contenuto_id = Column(Integer, ForeignKey('Contenuto.id', ondelete='CASCADE'))
contenuto = relationship("Contenuto", backref="tile", cascade="all,delete")
class Contenuto(Base):
__tablename__ = "Contenuto"
id = Column(Integer, primary_key=True, autoincrement=True )
testo = Column(String)
url = Column(String)
class ColonneSchema(SQLAlchemyAutoSchema):
class Meta:
model = Colonne
class TileSchema(SQLAlchemyAutoSchema):
class Meta:
model = Tile
colonna = Nested(ColonneSchema)
class ContenutoSchema(SQLAlchemyAutoSchema):
class Meta:
model = Contenuto
include_fk = True
include_relationships = True
load_instance = False
tile = fields.Nested(TileSchema)
# -----------------------crea tabelle----------------------------
if __name__ == '__main__':
Base.metadata.create_all(bind=engine)
crud.py
from flask_restful import Api, Resource
from data import schema
from pprint import pprint
def getColonne():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
# pprint(rs)
listaTot = []
listaColonne = []
for col in colonne:
dizionario = {}
# print("***************COlonna********************")
# print(schema.ColonneSchema().dump(col))
listaTile = []
dizionario['colonne'] = schema.ColonneSchema().dump(col)
for tile in col.tile:
# print("***************Tile********************")
# pprint(tileSchema.dump(tile))
# print("***************Contenuto********************")
# pprint(contenutoSchema.dump(tile.contenuto))
# print("***************Contenuto********************")
# print("***************Tile********************")
tiletmp = tileSchema.dump(tile)
tiletmp['contenuto'] = contenutoSchema.dump(tile.contenuto)
listaTile.append(tiletmp)
dizionario['tile'] = listaTile
# print("***************COlonna********************")
listaTot.append(dizionario)
return listaTot
if __name__ == "__main__":
pprint(getColonne())
class ProvaApi(Resource):
def get(self):
return {"prova": "sono una prova"}
class getAllColumns(Resource):
def get(self):
return getColonne()
现在,当我 运行 "python crud.py" getColonne()
工作时。
否则,当端点被调用时,当应用程序 运行 正在“getAllColumns”时,它无法打开数据库 sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to打开数据库文件.
结构是这样的:
项目
数据
schema.py
数据库
mydb.db
休息API
crud.py
测试
test.py
app.py
当然,虽然 运行 宁所有 test.py 我没有问题。
有什么问题?
您在 create_engine()
中使用了相对路径。使用绝对路径将解决您的问题。如果它相对于文件很重要,请使用 create_engine(f"sqlite:///{os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'database/mydb.db')"})
.
之类的东西
app.py
from pprint import pprint
from flask import Flask, render_template
from flask_restful import Api, Resource
from restApi import crud
app = Flask(__name__)
api = Api(app)
@app.route('/aad')
def hello_world():
return "hello world"
@app.route('/')
def hello_worldp():
return render_template("index.html")
class HelloWorld(Resource):
def get(self):
return {"prova": "sono una prova"}
api.add_resource(HelloWorld, "/HelloWorld")
api.add_resource(crud.ProvaApi, "/c")
api.add_resource(crud.getAllColumns, "/allColumns")
if __name__ == '__main__':
app.run(debug=True)
test.py
from pprint import pprint
from sqlalchemy.orm import joinedload
from flask import jsonify
from data import schema
def test1():
c = schema.Colonne(titolo="prova5", stato="boh")
cont1 = schema.Contenuto(testo="prova Contenuto1")
cont2 = schema.Contenuto(url="nonsourl")
t = schema.Tile(titolo="provaTile", autore="nonso", contenuto=cont1.id)
t2 = schema.Tile(titolo="provaTile", autore="nonso", contenuto=cont2.id)
session = schema.dbSession()
# c.tile.append(t)
#c.tile.append(t2)
#c.tile[0].contenuto.add(cont1)
#c.tile[0].contenuto.add(cont2)
# session.add(c)
#session.commit()
colschema = schema.ColonneSchema()
tileschema = schema.TileSchema()
contschema = schema.ContenutoSchema()
qr = session.query(schema.Colonne, schema.Tile, schema.Contenuto).\
select_from(schema.Colonne).\
join(schema.Tile, schema.Colonne.titolo == schema.Tile.colonna_id, isouter=True).\
join(schema.Contenuto, schema.Tile.contenuto == schema.Contenuto.id, isouter=True).all()
#qr = qr.filter(schema.Colonne.titolo == "prova5")
#record = qr.one()
print(qr)
#ti = record.tile.get(0).filter(schema.Tile.id == 1)
#ti.contenuto.add(cont1)
qp = session.query(schema.Tile);
qp = qp.filter(schema.Tile.id == "1")
record = qp.one()
record.contenuto = cont1.id
session.commit()
qr = session.query(schema.Colonne, schema.Tile, schema.Contenuto). \
select_from(schema.Colonne). \
join(schema.Tile, schema.Colonne.titolo == schema.Tile.colonna_id, isouter=True). \
join(schema.Contenuto, schema.Tile.contenuto == schema.Contenuto.id, isouter=True).all()
for r1, r2, r3 in qr:
print(colschema.dump(r1))
print(tileschema.dump(r2))
print(contschema.dump(r3))
def test2():
c = schema.Colonne(titolo="prova4", stato="boh")
t = schema.Tile(titolo="provaTile", autore="nonso")
t2 = schema.Tile(titolo="provaTile", autore="nonso")
cont = schema.Contenuto(testo="prova Contenuto", url ="nonsourl")
session = schema.dbSession()
c.tiles.append(t)
c.tiles[0].contenuto = cont
dum = schema.ColonneSchema().dump(c)
#print(c.tiles[0].contenuto)
#print(dum)
#c.tiles.append(t2)
#c.tiles[0].append(cont)
session.add(c)
session.commit()
#tiles
#cols = session.query(schema.Tile).all()
# print(jsonify(cols))
#t_schema = schema.TileSchema()
# print(c_schema.dump(cols))
#for col in cols:
# print(jsonify(col))
# print(t_schema.dump(col))
def test3():
session = schema.dbSession()
query = "select Colonne.titolo, Colonne.stato, Tile.id, Tile.autore, Tile.contenuto, Contenuto.testo,Contenuto.url" \
" from Colonne inner join Tile" \
" on Colonne.titolo = Tile.colonna_id left join " \
"Contenuto on Contenuto.id = Tile.contenuto;"
rs = session.execute(query)
#row_headers = [x[0] for x in rs.description]
resultset = [dict(row) for row in rs]
print(resultset)
#for row in rs:
# print(jsonify(row))
def test4():
query="insert into Contenuto values ('gna', 'codio')"
session = schema.dbSession()
rs = session.execute(query)
def test5():
query = "SELECT * FROM Contenuto"
session = schema.dbSession()
rs = session.execute(query)
presultset = [dict(row) for row in rs]
print(presultset)
def test6():
session = schema.dbSession()
rs = session.execute("PRAGMA foreign_keys;")
for row in rs:
print(row)
def test7():
session = schema.dbSession()
rs1 = session.query(schema.Colonne).select_from(schema.Colonne).join(schema.Tile).join(schema.Contenuto).all()
rs = session.query(schema.Colonne).all()
scCol = schema.ColonneSchema(many=True)
print(scCol.dump(rs))
#for row in rs:
# print(scCol.dump(rs))
def test8():
c = schema.Colonne(titolo="prova4", stato="boh")
t = schema.Tile(titolo="provaTile", autore="nonso", colonna = c)
t.colonna = c
cont = schema.Contenuto(testo="prova Contenuto", url="nonsourl")
t.contenuto = cont
rs = schema.ContenutoSchema().dump(cont)
pprint(rs)
rs = schema.ColonneSchema().dump(c)
pprint(rs)
def test9():
session = schema.dbSession()
rs = session.query(schema.Colonne).all()
pprint(schema.ColonneSchema(many=True).dump(rs))
def test10():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
pprint(rs)
listaTot= []
listaColonne = []
for col in colonne:
dizionario = {}
print("***************COlonna********************")
print(schema.ColonneSchema().dump(col))
listaTile = []
dizionario['colonne'] = schema.ColonneSchema().dump(col)
for tile in col.tile:
print("***************Tile********************")
#pprint(tileSchema.dump(tile))
print("***************Contenuto********************")
#pprint(contenutoSchema.dump(tile.contenuto))
print("***************Contenuto********************")
print("***************Tile********************")
tiletmp= tileSchema.dump(tile)
tiletmp['contenuto'] = contenutoSchema.dump(tile.contenuto)
listaTile.append(tiletmp)
dizionario['tile'] = listaTile
print("***************COlonna********************")
listaTot.append(dizionario)
return listaTot
def test11():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
pprint(rs)
listaColonne= []
for col in colonne:
print(schema.ColonneSchema().dump(col))
def test12():
sess = schema.dbSession()
r=sess.query(schema.Colonne).filter_by(titolo="prova4").first()
sess.delete(r)
sess.commit()
print(r)
def test13():#update
sess = schema.dbSession()
r=sess.query(schema.Colonne).filter_by(titolo="prova4").first()
r.titolo ="provagay"
sess.add(r)
sess.commit()
print(r)
def test14():#cancella elemento
sess = schema.dbSession()
r=sess.query(schema.Tile).filter_by(id="1").first()
sess.delete(r)
sess.commit()
print(r)
def test15():#update elemento
session = schema.dbSession()
c=session.query(schema.Colonne).filter_by(titolo="prova4").first()
t = schema.Tile(titolo="provaTile", autore="nonso")
t2 = schema.Tile(titolo="provaTile", autore="nonso")
cont = schema.Contenuto(testo="prova Contenuto", url ="nonsourl")
c.tiles.append(t)
c.tiles[0].contenuto = cont
dum = schema.ColonneSchema().dump(c)
#print(c.tiles[0].contenuto)
#print(dum)
#c.tiles.append(t2)
#c.tiles[0].append(cont)
session.add(c)
session.commit()
def query_to_dict(ret):
if ret is not None:
return [{key: value for key, value in row.items()} for row in ret if row is not None]
else:
return [{}]
if __name__ == "__main__":
pprint(test10())
schema.py
from flask import Flask
from marshmallow_sqlalchemy.fields import Nested
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey
from sqlalchemy.orm import relationship, scoped_session, sessionmaker, joinedload,backref
from sqlalchemy.ext.declarative import declarative_base
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema, fields
from flask_marshmallow import Marshmallow
app = Flask(__name__)
Base = declarative_base()
engine = create_engine('sqlite:///../database/mydb.db', convert_unicode=True)
dbSession = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine,
))
ma = Marshmallow(app)
class Colonne(Base):
__tablename__ = "Colonne"
id = Column(Integer,primary_key=True,autoincrement=True)
titolo = Column(String, unique=True)
stato = Column(String)
tile = relationship("Tile", back_populates ="colonna", cascade="all,delete")
class Tile(Base):
__tablename__ = "Tile"
id = Column(Integer, primary_key=True, autoincrement=True)
autore = Column(String)
titolo = Column(String)
colonna_id = Column(Integer, ForeignKey("Colonne.id"))
colonna = relationship("Colonne", backref="tiles", overlaps="tile")
contenuto_id = Column(Integer, ForeignKey('Contenuto.id', ondelete='CASCADE'))
contenuto = relationship("Contenuto", backref="tile", cascade="all,delete")
class Contenuto(Base):
__tablename__ = "Contenuto"
id = Column(Integer, primary_key=True, autoincrement=True )
testo = Column(String)
url = Column(String)
class ColonneSchema(SQLAlchemyAutoSchema):
class Meta:
model = Colonne
class TileSchema(SQLAlchemyAutoSchema):
class Meta:
model = Tile
colonna = Nested(ColonneSchema)
class ContenutoSchema(SQLAlchemyAutoSchema):
class Meta:
model = Contenuto
include_fk = True
include_relationships = True
load_instance = False
tile = fields.Nested(TileSchema)
# -----------------------crea tabelle----------------------------
if __name__ == '__main__':
Base.metadata.create_all(bind=engine)
crud.py
from flask_restful import Api, Resource
from data import schema
from pprint import pprint
def getColonne():
session = schema.dbSession()
colonne = session.query(schema.Colonne).all()
dizionario = {}
rs = schema.ColonneSchema(many=True).dump(colonne)
tileSchema = schema.TileSchema()
contenutoSchema = schema.ContenutoSchema()
# pprint(rs)
listaTot = []
listaColonne = []
for col in colonne:
dizionario = {}
# print("***************COlonna********************")
# print(schema.ColonneSchema().dump(col))
listaTile = []
dizionario['colonne'] = schema.ColonneSchema().dump(col)
for tile in col.tile:
# print("***************Tile********************")
# pprint(tileSchema.dump(tile))
# print("***************Contenuto********************")
# pprint(contenutoSchema.dump(tile.contenuto))
# print("***************Contenuto********************")
# print("***************Tile********************")
tiletmp = tileSchema.dump(tile)
tiletmp['contenuto'] = contenutoSchema.dump(tile.contenuto)
listaTile.append(tiletmp)
dizionario['tile'] = listaTile
# print("***************COlonna********************")
listaTot.append(dizionario)
return listaTot
if __name__ == "__main__":
pprint(getColonne())
class ProvaApi(Resource):
def get(self):
return {"prova": "sono una prova"}
class getAllColumns(Resource):
def get(self):
return getColonne()
现在,当我 运行 "python crud.py" getColonne()
工作时。
否则,当端点被调用时,当应用程序 运行 正在“getAllColumns”时,它无法打开数据库 sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to打开数据库文件.
结构是这样的: 项目 数据 schema.py 数据库 mydb.db 休息API crud.py 测试 test.py app.py
当然,虽然 运行 宁所有 test.py 我没有问题。
有什么问题?
您在 create_engine()
中使用了相对路径。使用绝对路径将解决您的问题。如果它相对于文件很重要,请使用 create_engine(f"sqlite:///{os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'database/mydb.db')"})
.