线程只能在 Django Channels 中启动一次

Threads can only be started once in Django Channels

我创建了一个简单的 Django Channels 消费者,它应该连接到外部源、检索数据并将其发送到客户端。所以,用户打开页面 > 消费者连接到外部服务并获取数据 > 数据发送到websocket。

这是我的代码:

import json
from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer

from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio

client = Client('', '')

trades = client.get_recent_trades(symbol='BNBBTC')
bm = BinanceSocketManager(client)
class EchoConsumer(AsyncJsonWebsocketConsumer):


    async def connect(self):
        await self.accept()
        await self.send_json('test')


        bm.start_trade_socket('BNBBTC', self.process_message)
        bm.start()


    def process_message(self, message):
        JSON1 = json.dumps(message)
        JSON2 = json.loads(JSON1)

        #define variables
        Rate = JSON2['p']
        Quantity = JSON2['q']
        Symbol = JSON2['s']
        Order = JSON2['m']

        asyncio.create_task(self.send_json(Rate))
        print(Rate)

当我打开一页时,此代码有效;但是,如果我尝试用一​​个新帐户打开一个新的 window,它会抛出以下错误:

File "C:\Users\User\Desktop\Heroku\github\master\myapp\consumers.py", line 54, in connect
    bm.start()
  File "C:\Users\User\lib\threading.py", line 843, in start
    raise RuntimeError("threads can only be started once")
  threads can only be started once

我是 Channels 的新手,所以这是一个新手问题,但我该如何解决这个问题?我想做的是:用户打开页面并获取数据,另一个用户打开页面并获取数据;有没有办法做到这一点?还是我只是误解了 Django Channels 和 websockets 的工作原理?

我不是 Django 开发人员,但如果我理解正确的话,函数 connect 被调用了不止一次——并且 bm.start 引用了很可能在 [=12] 中创建的同一个线程=](或连接中的其他地方)。总之,当 bm.start 被调用时,一个线程被启动,当它再次完成时,你会得到那个错误。

这里start()开始线程的activity.

每个线程对象最多调用一次。您已将 BinanceSocketManager 的全局对象设为 "bm"。

如果在同一个线程对象上多次调用,它将始终引发 RuntimeError。

请参考下面提到的代码,它可能对你有帮助

from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer

from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio


class EchoConsumer(AsyncJsonWebsocketConsumer):
    client = Client('', '')

    trades = client.get_recent_trades(symbol='BNBBTC')
    bm = BinanceSocketManager(client)

    async def connect(self):
        await self.accept()
        await self.send_json('test')


        self.bm.start_trade_socket('BNBBTC', self.process_message)
        self.bm.start()


    def process_message(self, message):
        JSON1 = json.dumps(message)
        JSON2 = json.loads(JSON1)

        #define variables
        Rate = JSON2['p']
        Quantity = JSON2['q']
        Symbol = JSON2['s']
        Order = JSON2['m']

        asyncio.create_task(self.send_json(Rate))
        print(Rate)

你真的需要辅助线程吗?

class EchoConsumer(AsyncJsonWebsocketConsumer):

    symbol = ''

    async def connect(self):
        self.symbol = 'BNBBTC'
        # or, more probably, retrieve the value for "symbol" from query_string
        # so the client can specify which symbol he's interested into:
        #    socket = new WebSocket("ws://.../?symbol=BNBBTC");
        await self.accept()

    def process_message(self, message):
        # PSEUDO-CODE BELOW !
        if self.symbol == message['symbol']:
            await self.send({
                'type': 'websocket.send',
                'text': json.dumps(message),
            })

为了更加灵活,您也可以接受来自客户端的所有交易品种列表,而不是:

//HTML
socket = new WebSocket("ws://.../?symbols=XXX,YYY,ZZZ");

然后(在消费者中):

class EchoConsumer(AsyncJsonWebsocketConsumer):

    symbols = []

    async def connect(self):
        # here we need to parse "?symbols=XXX,YYY,ZZZ" ...
        # the code below has been stolen from another project of mine and should be suitably adapted
        params = urllib.parse.parse_qs(self.scope.get('query_string', b'').decode('utf-8'))
        try:
            self.symbols = json.loads(params.get('symbols', ['[]'])[0])
        except:
            self.symbols = []

    def process_message(self, message):
        if message['symbol'] in self.symbols:
            ...