如何从 flask socketio 中的线程发射到某个套接字?

How to emit to a certain socket from a thread in flask socketio?

我想隔离每个套接字连接并发送到那个特定的连接,当我尝试 运行 没有线程时,它工作正常并且每个套接字都有自己的消息,但是当我使用线程时,它只是开始广播,所有客户端都收到所有消息。

这是我的客户端的样子:

var socket = io.connect('https://my_server_link');

 socket.on('connect', function() {
  console.log('Connection Success')
 });

 // Recieves the session id
 socket.on('connection_status',  (data)=>{
  console.log('Connection Status')
  console.log(data);
  session_id = data['session_id']
  socket.emit('verifyClient', {'session_id': session_id, 'api_id': api_id})
 })

 socket.on('verification_status', (data)=>{
  console.log('Verification Status')
  console.log(data)
  if (data['status']){
   console.log('Verified')
   socket.emit('setupUser', {'session_id': session_id, 'user_id': user_id})
  }
 })

 socket.on('user_setup_status', (data)=>{
  console.log('User Setup Status');
  console.log(data);
  if (data['status']){
   console.log('User Setup Successful, Starting monitoring...')
   running_ops = setInterval(()=>{
    op_interval(session_id)
   }, 5000)
  }
 })

 function op_interval(session_id){
  ctx2.drawImage(video, 0, 0, video.width, video.height);
  frame = canva.toDataURL('image/jpeg');
  frame = to_blob(frame);
  socket.emit('monitor', {
   'frame':frame,
   'session_id':session_id
   });
 }

 // when error messages are encountered, generate alert on client side
 socket.on('alert', (alert)=>{
  console.log(alert);
  // TODO : Handle Alerts Here
 })

这是我的服务器端。

app = Flask(__name__)

CORS(app, resources={"*": {"origins": "*"}})

# eventlet.monkey_patch()

socketio = SocketIO(app, async_mode='threading', cors_allowed_origins="*")

# Set up socket connection to the client and register session id
@socketio.on('connect')
def handle_connection():
 print('Connection Successfull')
 # connection_id = request.sid
 session_id = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
 active_socket_sessions[session_id] = {'session_id': session_id}#, 'connection_id': connection_id}
 join_room(session_id)
 emit('connection_status', {'status':'success', 'session_id':session_id}, room=session_id)


@socketio.on('verifyClient')
def verification(data):
 session_id = data['session_id']
 client_id = data['api_id']
 verified = False
 message = "Session not found."

 if session_id in active_socket_sessions.keys():
  client, message = verify_client(database, client_id, active_sessions)

  if len(client.keys()) > 0:
   verified = True
   active_socket_sessions[session_id]['client_id'] = client['client_id']

  emit('verification_status', {'status':verified, 'message': message, 'session_id':session_id}, room=session_id)
 else:
  emit('verification_status', {'status':verified, 'message': message, 'session_id':session_id})


@socketio.on('setupUser')
def user_setup(data):
 session_id = data['session_id']
 user_id = data['user_id']
 found = False
 message = 'Session not found.'

 if session_id in active_socket_sessions.keys():
  client_id = active_socket_sessions[session_id]['client_id']
  user, message = get_user(database, {'client_id': client_id}, user_id)

  if len(user.keys()) > 0:
   found = True
   user['person_encoding'] = str(user['person_encoding'])
   active_socket_sessions[session_id]['user'] = user
   rediss.set_dict(session_id, active_socket_sessions[session_id])
   active_socket_sessions[session_id] = {}

  emit('user_setup_status', {'status': found, 'message': message, 'session_id': session_id}, room=session_id)
 else:
  emit('user_setup_status', {'status': found, 'message': message, 'session_id': session_id})


@socketio.on('disconnect')
def handle_disconnect():
 # Handle Clearing Of Socket From Redis And Also Closing The Socket Connection
 emit('connection_ended', {'data': 'Successfully Disconnected, Socket Is Closed.'})


@socketio.on('monitor')
def monitor(data):
 frame = data['frame']
 session_id = data['session_id']
 socketio.start_background_task(monitoring_process, frame, session_id)


def monitoring_process(frame, session_id):
 if session_id in active_socket_sessions.keys():
  session = rediss.get_dict(session_id)
  client_id = session['client_id']
  user = session['user']
  user_encoding = np.array(eval(user['person_encoding']))
  person_id = user['person_id']
  browser_info = {}

  frame = cv2.imdecode(np.frombuffer(frame, np.uint8), -1)
  preds = model.model.predict(frame, user_encoding)

  if not preds['recognized']:
   alert = vars.BRISO_ALERTS[preds['message']]
   alert['timestamp'] = str(time.time())
   alert['log_id'] = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
   alert['person_id'] = person_id
   headers = browser_info
   message, flag = log_alert(database, client_id, alert, headers)
   preds['session_id'] = session_id
   socketio.emit('alert', preds, room=session_id)


if __name__ == "__main__":
 socketio.run(app, host=vars.HOST_NAME, port=vars.SERVER_PORT, debug=False, use_reloader=False)

好的,正如 Miguel Grinberg 本人所建议的那样,当我在 github 上询问他时,我唯一要做的就是将我自己的设置 session_id 切换为 request.sid 而不是调用 join_room 方法。

更新后的(工作)代码如下所示:

对于客户端socket.io:

socket.on('connect', function() {
  console.log('Connection Success')
 });

 // Recieves the session id
 socket.on('connection_status', (data)=>{
  console.log('Connection Status')
  console.log(data);
  session_id = data['session_id']
  socket.emit('verify_client', {'session_id': session_id, 'api_id': api_id}, (status)=>{
   socket.emit('setup_user', {'session_id': session_id, 'user_id': user_id}, (status)=>{
    running_ops = setInterval(()=>{
     op_interval(session_id)
    }, 5000)
   })
  })
 })
  
  function op_interval(session_id){
  ctx2.drawImage(video, 0, 0, video.width, video.height);
  frame = canva.toDataURL('image/jpeg');
  frame = to_blob(frame);
  socket.emit('monitor', {
   'frame':frame,
   'session_id':session_id
   });
 }
  
  // when error messages are encountered, generate alert on client side
 socket.on('alert', (alert)=>{
  console.log(alert);
 })

对于python烧瓶插座

# Set up socket connection to the client and register session id
@socketio.on('connect')
def handle_connection():
 print('Connection Successfull')
 session_id = request.sid
 active_socket_sessions[session_id] = {'session_id': session_id}#, 'connection_id': connection_id}
 emit('connection_status', {'status':'success', 'session_id':session_id}, room=session_id)


@socketio.on('verify_client')
def verification(data):
 session_id = request.sid
 client_id = data['api_id']
 verified = False
 message = "Session not found."

 if session_id in active_socket_sessions.keys():
  client, message = verify_client(database, client_id, active_sessions)

  if len(client.keys()) > 0:
   verified = True
   active_socket_sessions[session_id]['client_id'] = client['client_id']

  return json.dumps({'status': verified, 'message': message, 'session_id': session_id})
 else:
  return json.dumps({'status': verified, 'message': message, 'session_id': session_id})

@socketio.on('setup_user')
def user_setup(data):
 session_id = request.sid
 user_id = data['user_id']
 found = False
 message = 'Session not found.'

 if session_id in active_socket_sessions.keys():
  client_id = active_socket_sessions[session_id]['client_id']
  user, message = get_user(database, {'client_id': client_id}, user_id)

  if len(user.keys()) > 0:
   found = True
   user['person_encoding'] = str(user['person_encoding'])
   active_socket_sessions[session_id]['user'] = user
   rediss.set_dict(session_id, active_socket_sessions[session_id])
   active_socket_sessions[session_id] = {}

  return json.dumps({'status': found, 'message': message, 'session_id': session_id})
 else:
  return json.dumps({'status': found, 'message': message, 'session_id': session_id})


@socketio.on('disconnect')
def handle_disconnect():
 # Handle Clearing Of Socket From Redis And Also Closing The Socket Connection
 emit('connection_ended', {'data': 'Successfully Disconnected, Socket Is Closed.'})


@socketio.on('monitor')
def monitor(data):
 frame = data['frame']
 session_id = request.sid

 process = Thread(target=monitoring_process, args=(frame, session_id,))
 process.start()
 process.join()
 socketio.start_background_task(monitoring_process, frame, session_id)

def monitoring_process(frame, session_id):
 if session_id in active_socket_sessions.keys():
  session = rediss.get_dict(session_id)
  client_id = session['client_id']
  user = session['user']
  user_encoding = np.array(eval(user['person_encoding']))
  person_id = user['person_id']
  browser_info = {}

  frame = cv2.imdecode(np.frombuffer(frame, np.uint8), -1)
  preds = model.model.predict(frame, user_encoding)

  if not preds['recognized']:
   alert = vars.ALERTS[preds['message']]
   alert['timestamp'] = str(time.time())
   alert['log_id'] = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
   alert['person_id'] = person_id
   headers = browser_info
   message, flag = log_alert(database, client_id, alert, headers)
   preds['session_id'] = session_id
   # return preds
   socketio.emit('alert', preds, room=session_id)