离线 Python 脚本中的 OAuth 和 redirect_uri
OAuth and redirect_uri in offline Python script
我目前正在尝试编写一个 Python 脚本,它将使用 Deviantart 的 API 来自动随机播放我的收藏夹。为此,我需要先登录我的脚本。 Deviantart 使用 OAuth2 身份验证,这需要 redirect_uri,据我了解,它应该是我的应用程序所在的服务器 运行。
但是我 运行 脚本在我的本地计算机上(而不是在服务器上)并且只是通过 Python 的 Requests
库发送 http 请求。当 OAuth 过程将身份验证令牌所需的代码作为 GET
调用的参数发送到 redirect_uri 时,我该如何进行身份验证,这对我来说特别无处可寻?没有 运行 服务器就无法进行身份验证吗?
编辑
我的问题仍然是我 运行 一个简单的离线脚本,我不确定如何通过它进行身份验证。
这是我目前的验证码:
import binascii, os, requests
def auth():
request = 'https://www.deviantart.com/oauth2/authorize'
state = binascii.hexlify(os.urandom(160))
params = {
'response_type': 'token',
'client_id': 1,
'redirect_uri': 'https://localhost:8080',
'state': state,
'scope': 'user'
}
r = requests.get(request, params)
print(r)
打印的响应只是一个 200 HTTP 代码,而不是访问令牌(很明显,因为没有在任何地方输入用户名和密码)。请求被发送到 DA 的授权页面,但由于该页面本身并没有在我的脚本中实际打开,所以我无法在任何地方输入我的用户名和密码来登录。而且我也不能直接在 GET
请求中发送用户名和密码以这种方式进行身份验证(同样显然,因为像那样发送密码是一个糟糕的主意)。
最好我想要一种方法,让用户(我)在脚本 运行 所在的控制台中提示输入用户名和密码,然后让脚本在用户完成后继续执行登录成功。
或者,如果上述方法不可行,脚本应在浏览器中打开授权网页,然后在用户登录后继续执行。
我将如何在 Python 中实现这两种解决方案中的任何一种?
您应该使用收到的代码获取授权令牌。此令牌将用于之后访问 DeviantArt。
请参阅 https://www.deviantart.com/developers/authentication(第 "Using The Authorization Code Grant" 部分)。
如果您的应用程序处于离线状态,则您不能使用授权代码或隐式授权类型:这两种流程都需要重定向 URI。
由于无法从 Internet 访问您的 python 脚本,并且 Deviantart 不允许使用其他授权类型(客户端凭据除外,但与您的情况无关),那么您将不会能够颁发任何访问令牌。
您的应用程序必须可以从 Internet 访问。
根据请求,我正在使用我最终用于脚本身份验证的代码更新此问题,希望它能对某人有所帮助。
import webbrowser
import requests
import urllib.parse
import binascii
import os
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
AUTH = 'https://www.deviantart.com/oauth2/authorize'
TOKEN = 'https://www.deviantart.com/oauth2/token'
code = ''
state = binascii.hexlify(os.urandom(20)).decode('utf-8')
class Communicator:
def __init__(self):
self.client_id = '<insert-actual-id>' # You get these two from the DA developer API page
self.client_secret = '<insert-actual-secret>' # but it's safer if you store them in a separate file
self.server, self.port = 'localhost', 8080
self._redirect_uri = f'http://{self.server}:{self.port}'
self._last_request_time = 0
def auth(self, *args):
scope = ' '.join(args)
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self._redirect_uri,
'scope': scope,
'state': state
}
request = requests.Request('GET', AUTH, params).prepare()
request.prepare_url(AUTH, params)
webbrowser.open(request.url)
server = HTTPServer((self.server, self.port), RequestHandler)
server.handle_request()
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self._redirect_uri
}
self._get_token(params)
def _get_token(self, params):
r = requests.get(TOKEN, params).json()
self.token = r['access_token']
self.refresh_token = r['refresh_token']
def _refresh_token(self):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
self._get_token(params)
def _request(self, func, url, params, sleep=5, cooldown=600):
t = time.time()
if t - self._last_request_time < sleep:
time.sleep(sleep - t + self._last_request_time)
self._last_request_time = t
max_sleep = 16 * sleep
params['access_token'] = self.token
while True:
try:
r = func(url, params).json()
if 'error_code' in r and r['error_code'] == 429:
sleep *= 2
time.sleep(sleep)
if sleep > max_sleep:
raise ConnectionError("Request timed out - server is busy.")
elif 'error' in r and r['error'] == 'user_api_threshold':
raise ConnectionError("Too many requests")
elif 'error' in r and r['error'] == 'invalid_token':
print("Refreshing token.")
self._refresh_token()
params['access_token'] = self.token
else:
return r
except ConnectionError:
print(f"Request limit reached - waiting {cooldown // 60} minutes before retrying...")
time.sleep(cooldown)
def get(self, url, params):
return self._request(requests.get, url, params)
def post(self, url, params):
return self._request(requests.post, url, params)
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
global code
self.close_connection = True
query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
if not query['state'] or query['state'][0] != state:
raise RuntimeError("state argument missing or invalid")
code = query['code']
BROWSE = 'browse'
BROWSE_MORE_LIKE_THIS = 'browse.mlt'
COLLECTION = 'collection'
COMMENT = 'comment.post'
FEED = 'feed'
GALLERY = 'gallery'
MESSAGE = 'message'
NOTE = 'note'
PUBLISH = 'publish'
STASH = 'stash'
USER = 'user'
USER_MANAGE = 'user.manage'
if __name__ == '__main__':
com = Communicator()
com.auth(BROWSE, COLLECTION) # request specific permissions
... # do stuff with com.get() and com.post() requests
我目前正在尝试编写一个 Python 脚本,它将使用 Deviantart 的 API 来自动随机播放我的收藏夹。为此,我需要先登录我的脚本。 Deviantart 使用 OAuth2 身份验证,这需要 redirect_uri,据我了解,它应该是我的应用程序所在的服务器 运行。
但是我 运行 脚本在我的本地计算机上(而不是在服务器上)并且只是通过 Python 的 Requests
库发送 http 请求。当 OAuth 过程将身份验证令牌所需的代码作为 GET
调用的参数发送到 redirect_uri 时,我该如何进行身份验证,这对我来说特别无处可寻?没有 运行 服务器就无法进行身份验证吗?
编辑
我的问题仍然是我 运行 一个简单的离线脚本,我不确定如何通过它进行身份验证。
这是我目前的验证码:
import binascii, os, requests
def auth():
request = 'https://www.deviantart.com/oauth2/authorize'
state = binascii.hexlify(os.urandom(160))
params = {
'response_type': 'token',
'client_id': 1,
'redirect_uri': 'https://localhost:8080',
'state': state,
'scope': 'user'
}
r = requests.get(request, params)
print(r)
打印的响应只是一个 200 HTTP 代码,而不是访问令牌(很明显,因为没有在任何地方输入用户名和密码)。请求被发送到 DA 的授权页面,但由于该页面本身并没有在我的脚本中实际打开,所以我无法在任何地方输入我的用户名和密码来登录。而且我也不能直接在 GET
请求中发送用户名和密码以这种方式进行身份验证(同样显然,因为像那样发送密码是一个糟糕的主意)。
最好我想要一种方法,让用户(我)在脚本 运行 所在的控制台中提示输入用户名和密码,然后让脚本在用户完成后继续执行登录成功。
或者,如果上述方法不可行,脚本应在浏览器中打开授权网页,然后在用户登录后继续执行。
我将如何在 Python 中实现这两种解决方案中的任何一种?
您应该使用收到的代码获取授权令牌。此令牌将用于之后访问 DeviantArt。
请参阅 https://www.deviantart.com/developers/authentication(第 "Using The Authorization Code Grant" 部分)。
如果您的应用程序处于离线状态,则您不能使用授权代码或隐式授权类型:这两种流程都需要重定向 URI。
由于无法从 Internet 访问您的 python 脚本,并且 Deviantart 不允许使用其他授权类型(客户端凭据除外,但与您的情况无关),那么您将不会能够颁发任何访问令牌。
您的应用程序必须可以从 Internet 访问。
根据请求,我正在使用我最终用于脚本身份验证的代码更新此问题,希望它能对某人有所帮助。
import webbrowser
import requests
import urllib.parse
import binascii
import os
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
AUTH = 'https://www.deviantart.com/oauth2/authorize'
TOKEN = 'https://www.deviantart.com/oauth2/token'
code = ''
state = binascii.hexlify(os.urandom(20)).decode('utf-8')
class Communicator:
def __init__(self):
self.client_id = '<insert-actual-id>' # You get these two from the DA developer API page
self.client_secret = '<insert-actual-secret>' # but it's safer if you store them in a separate file
self.server, self.port = 'localhost', 8080
self._redirect_uri = f'http://{self.server}:{self.port}'
self._last_request_time = 0
def auth(self, *args):
scope = ' '.join(args)
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self._redirect_uri,
'scope': scope,
'state': state
}
request = requests.Request('GET', AUTH, params).prepare()
request.prepare_url(AUTH, params)
webbrowser.open(request.url)
server = HTTPServer((self.server, self.port), RequestHandler)
server.handle_request()
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self._redirect_uri
}
self._get_token(params)
def _get_token(self, params):
r = requests.get(TOKEN, params).json()
self.token = r['access_token']
self.refresh_token = r['refresh_token']
def _refresh_token(self):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
}
self._get_token(params)
def _request(self, func, url, params, sleep=5, cooldown=600):
t = time.time()
if t - self._last_request_time < sleep:
time.sleep(sleep - t + self._last_request_time)
self._last_request_time = t
max_sleep = 16 * sleep
params['access_token'] = self.token
while True:
try:
r = func(url, params).json()
if 'error_code' in r and r['error_code'] == 429:
sleep *= 2
time.sleep(sleep)
if sleep > max_sleep:
raise ConnectionError("Request timed out - server is busy.")
elif 'error' in r and r['error'] == 'user_api_threshold':
raise ConnectionError("Too many requests")
elif 'error' in r and r['error'] == 'invalid_token':
print("Refreshing token.")
self._refresh_token()
params['access_token'] = self.token
else:
return r
except ConnectionError:
print(f"Request limit reached - waiting {cooldown // 60} minutes before retrying...")
time.sleep(cooldown)
def get(self, url, params):
return self._request(requests.get, url, params)
def post(self, url, params):
return self._request(requests.post, url, params)
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
global code
self.close_connection = True
query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
if not query['state'] or query['state'][0] != state:
raise RuntimeError("state argument missing or invalid")
code = query['code']
BROWSE = 'browse'
BROWSE_MORE_LIKE_THIS = 'browse.mlt'
COLLECTION = 'collection'
COMMENT = 'comment.post'
FEED = 'feed'
GALLERY = 'gallery'
MESSAGE = 'message'
NOTE = 'note'
PUBLISH = 'publish'
STASH = 'stash'
USER = 'user'
USER_MANAGE = 'user.manage'
if __name__ == '__main__':
com = Communicator()
com.auth(BROWSE, COLLECTION) # request specific permissions
... # do stuff with com.get() and com.post() requests