Python Django:为什么 Google OAuth2 Token 在过期时间后有效?
Python Django: Why Google OAuth2 Token is valid after expiration time?
我正在使用 Google OAuth2 流程,如下所述:https://developers.google.com/identity/protocols/oauth2/web-server.
首先我检查用户凭据是否已经在数据库中,如果没有则启动身份验证流程。将代码交换为访问令牌后,凭据将保存在数据库中。与访问令牌和刷新令牌一起保存令牌到期日期时间,即 1 小时 (credentials.expiry)。
然后我想检查如何刷新访问令牌。所以我在token过期时间后明确请求了API。之前保存的凭据在数据库中,所以我从那里获取它们。检查其有效性,并更新访问令牌。但是,由于某种原因,令牌是有效的,并且通过向 Google API 发出测试请求,我有点可以发出请求,不会引发错误。
我不明白为什么令牌有效,但它显然已过期。我还在这里检查了令牌有效性:https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=AT 并得到了 { "error_description": "Invalid Value" }.
我的代码:
def google_oauth_init_flow(request):
# get user id
user_id = request.GET.get('user_id', None)
print(user_id)
user = User.objects.get(id=user_id)
# set id to a state parameter
request.session['state'] = str(user.id)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
os.path.abspath(os.path.join(os.path.dirname(__file__), KEY_FILE_LOCATION)),
scopes=SCOPES)
flow.redirect_uri = REDIRECT_URI
authorization_url, state = flow.authorization_url(
# Enable offline access so that you can refresh an access token without
# re-prompting the user for permission. Recommended for web server apps.
access_type='offline',
# Enable incremental authorization. Recommended as a best practice.
include_granted_scopes='true',
state=str(user.id)
)
return HttpResponseRedirect(authorization_url)
def google_oauth_exchange_token(request):
state = request.session.get('state', 'No state')
print(state)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
os.path.abspath(os.path.join(os.path.dirname(__file__), KEY_FILE_LOCATION)),
scopes=SCOPES,
state=state)
flow.redirect_uri = REDIRECT_URI
authorization_response = request.build_absolute_uri()
flow.fetch_token(authorization_response=authorization_response)
credentials = flow.credentials
# save creds in DB
try:
save_credentials(credentials, state)
except IntegrityError as e:
if 'unique constraint' in e.message:
# duplicate detected
return HttpResponse('Violation of unique constraint')
def google_oauth_check_token(credentials, user_id):
# convert dict into credentials instance
if type(credentials) == dict:
credentials = init_creds_instance(credentials)
if credentials.expired:
print('Token is expired getting new one...')
# refresh credentials
request = google.auth.transport.requests.Request()
credentials.refresh(request)
# alternative method
# credentials.refresh(httplib2.Http())
# update token in DB
SystemServiceTokens.objects.filter(id=user_id).update(name='token', key=credentials.token)
return credentials
else: # always Token is valid
print('TOKEN', credentials.token)
print('EXPIRY', credentials.expiry)
print('REFRESH TOKEN', credentials.refresh_token)
print('Token is valid')
return credentials
def get_user_info(credentials):
user_info = build(serviceName='oauth2', version='v2', credentials=credentials)
return user_info.userinfo().get().execute()
def save_credentials(credentials, state):
user = User.objects.get(id=state)
google_oauth_check_token(credentials, user.id)
# get email
user_info = get_user_info(credentials)
model = SystemServiceTokens()
# check duplicate values
if not model.objects.filter(user_id=user.id, email=user_info['email'], service_id='ga').exists():
print("Entry contained in queryset")
model.token = credentials.token
model.refresh_token = credentials.refresh_token
model.expires_at = datetime.datetime.strftime(credentials.expiry, '%Y-%m-%d %H:%M:%S')
model.user = user
model.email = user_info['email']
model.service_id = 'ga'
return model.save()
else:
return HttpResponse('Unique constraint violation')
# No errors
def test_request_google(request):
credentials = SystemServiceTokens.objects.filter(user_id=9).first() # DEBUG ID
print(model_to_dict(credentials))
credentials = google_oauth_check_token(model_to_dict(credentials), 9)
# test
user_info = build(serviceName='oauth2', version='v2', credentials=credentials)
print(user_info.userinfo().get().execute())
drive = build('drive', 'v2', credentials=credentials)
如果 python 客户端库可以检测到刷新令牌,它将为您刷新它。假设您的代码工作正常,您永远不应该让访问令牌过期,库将在它过期前五分钟刷新它。
我的 Python 不是最好的,但在源代码中可能是这样。
http.py#L1559
我正在使用 Google OAuth2 流程,如下所述:https://developers.google.com/identity/protocols/oauth2/web-server.
首先我检查用户凭据是否已经在数据库中,如果没有则启动身份验证流程。将代码交换为访问令牌后,凭据将保存在数据库中。与访问令牌和刷新令牌一起保存令牌到期日期时间,即 1 小时 (credentials.expiry)。
然后我想检查如何刷新访问令牌。所以我在token过期时间后明确请求了API。之前保存的凭据在数据库中,所以我从那里获取它们。检查其有效性,并更新访问令牌。但是,由于某种原因,令牌是有效的,并且通过向 Google API 发出测试请求,我有点可以发出请求,不会引发错误。
我不明白为什么令牌有效,但它显然已过期。我还在这里检查了令牌有效性:https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=AT 并得到了 { "error_description": "Invalid Value" }.
我的代码:
def google_oauth_init_flow(request):
# get user id
user_id = request.GET.get('user_id', None)
print(user_id)
user = User.objects.get(id=user_id)
# set id to a state parameter
request.session['state'] = str(user.id)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
os.path.abspath(os.path.join(os.path.dirname(__file__), KEY_FILE_LOCATION)),
scopes=SCOPES)
flow.redirect_uri = REDIRECT_URI
authorization_url, state = flow.authorization_url(
# Enable offline access so that you can refresh an access token without
# re-prompting the user for permission. Recommended for web server apps.
access_type='offline',
# Enable incremental authorization. Recommended as a best practice.
include_granted_scopes='true',
state=str(user.id)
)
return HttpResponseRedirect(authorization_url)
def google_oauth_exchange_token(request):
state = request.session.get('state', 'No state')
print(state)
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
os.path.abspath(os.path.join(os.path.dirname(__file__), KEY_FILE_LOCATION)),
scopes=SCOPES,
state=state)
flow.redirect_uri = REDIRECT_URI
authorization_response = request.build_absolute_uri()
flow.fetch_token(authorization_response=authorization_response)
credentials = flow.credentials
# save creds in DB
try:
save_credentials(credentials, state)
except IntegrityError as e:
if 'unique constraint' in e.message:
# duplicate detected
return HttpResponse('Violation of unique constraint')
def google_oauth_check_token(credentials, user_id):
# convert dict into credentials instance
if type(credentials) == dict:
credentials = init_creds_instance(credentials)
if credentials.expired:
print('Token is expired getting new one...')
# refresh credentials
request = google.auth.transport.requests.Request()
credentials.refresh(request)
# alternative method
# credentials.refresh(httplib2.Http())
# update token in DB
SystemServiceTokens.objects.filter(id=user_id).update(name='token', key=credentials.token)
return credentials
else: # always Token is valid
print('TOKEN', credentials.token)
print('EXPIRY', credentials.expiry)
print('REFRESH TOKEN', credentials.refresh_token)
print('Token is valid')
return credentials
def get_user_info(credentials):
user_info = build(serviceName='oauth2', version='v2', credentials=credentials)
return user_info.userinfo().get().execute()
def save_credentials(credentials, state):
user = User.objects.get(id=state)
google_oauth_check_token(credentials, user.id)
# get email
user_info = get_user_info(credentials)
model = SystemServiceTokens()
# check duplicate values
if not model.objects.filter(user_id=user.id, email=user_info['email'], service_id='ga').exists():
print("Entry contained in queryset")
model.token = credentials.token
model.refresh_token = credentials.refresh_token
model.expires_at = datetime.datetime.strftime(credentials.expiry, '%Y-%m-%d %H:%M:%S')
model.user = user
model.email = user_info['email']
model.service_id = 'ga'
return model.save()
else:
return HttpResponse('Unique constraint violation')
# No errors
def test_request_google(request):
credentials = SystemServiceTokens.objects.filter(user_id=9).first() # DEBUG ID
print(model_to_dict(credentials))
credentials = google_oauth_check_token(model_to_dict(credentials), 9)
# test
user_info = build(serviceName='oauth2', version='v2', credentials=credentials)
print(user_info.userinfo().get().execute())
drive = build('drive', 'v2', credentials=credentials)
如果 python 客户端库可以检测到刷新令牌,它将为您刷新它。假设您的代码工作正常,您永远不应该让访问令牌过期,库将在它过期前五分钟刷新它。
我的 Python 不是最好的,但在源代码中可能是这样。 http.py#L1559