跨多个服务器跟踪客户端

Keeping track of clients across multiple servers

过去几个月我一直在设计一个聊天系统来找乐子,但我找不到太多关于负载平衡的东西..

到目前为止,我的架构由一个 WebSocket 服务器组成,尽管为了简单起见,websocket 层将被排除在本主题之外; MySQL 数据库,用于存储用户帐户和聊天信息; nginx 上的 php 导向网站 运行。

我考虑过使用 memcached 来保存 聊天 的列表,并引用每个 连接的客户端 ,但我不确定如何进行 messaging/queue 系统以在发送消息或用户有 joined/quit(Redis?)时通知其他连接的客户端。

归根结底,这个并发问题还有其他潜在的缺陷,即我是否应该将处理层从socket层抽象出来,在处理层中,而不用担心其他客户端在处理过程中是否断开?我应该让套接字层处理吗?

在我的 memcached 示例中,我可以将所有相关的客户端信息存储在该 ramdisk 中,并且 request/update 它是我认为合适的。这是否是一种 可接受的 方式打算怎么办?

最好是,我想阅读一些 material 并自己弄清楚如何做到这一点,而不是仅仅从这里的某人那里得到答案,我希望能够将其用作如果我再次设计这样的东西,将来的可扩展性课程。

这是我制作的测试服务器

import multiprocessing
import socket, select
import redis, json

'''
'' The basic idea of this server is to have cores-1 processes running to munch data
'' and one "master" process handling all of the client connections and what not.
''
'' Scalability is simple, treat any other process as another server and there will be
'' no cross-coding required '''

'''
'' Sending data
''  master_pipe.send( {"__id__": "SEND_DATA", "fileno": "417", "data": "3025561204"} )
''
'' Closing a socket
''  master_pipe.send( {"__id__": "CLOSE_SOCKET", "fileno": 417} ) '''
def Worker(worker_index, worker_queue, master_pipe):
        memory = redis.StrictRedis(host = "127.0.0.1", port = 6379)

    #try:
        for client_id, *args in iter(worker_queue.get, None):
            client = json.loads(memory.get("server0:client-" + str(client_id)).decode("utf-8"))
            if not client:
                continue

            #print("NOPE", args)

            if args[0][:5] == "join:":
                client["chat"] = str(args[0][5:])
                memory.set("server0:client-" + str(client_id), json.dumps(client).encode("utf-8", "ignore"))
                memory.lpush("chat:" + str(args[0][5:]), client["__id__"])

            elif args[0][:7] == "online:":  
                #print(client)
                if "chat" in client:
                    print(memory.lrange("chat:" + client["chat"], 0, -1))

    #except Exception as e:
    #   #print(e)


def Master(master_pipe, workers):
    memory = redis.Redis(host = "127.0.0.1", port = 6379)
    memory.delete("clients")

    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    server.bind(("0.0.0.0", 7777))
    server.listen(socket.SOMAXCONN)

    epoll = select.epoll()
    epoll.register(server.fileno(), select.EPOLLIN)
    epoll.register(master_pipe.fileno(), select.EPOLLIN)

    sockets, i = {}, 0
    while True:
        for fileno, e_bits in epoll.poll():
            try:
                if fileno == server.fileno():
                    sock, (addr, port) = server.accept()
                    sockets[sock.fileno()] = sock
                    epoll.register(sock.fileno(), select.EPOLLIN | select.EPOLLHUP)

                    client_object = {"__id__": sock.fileno()}
                    memory.set("server0:client-" + str(sock.fileno()), json.dumps(client_object).encode("utf-8", "ignore"))

                elif fileno == master_pipe.fileno():
                    print(master_pipe.recv())

                elif e_bits & select.EPOLLIN:
                    recv = sockets[fileno].recv(1024).decode("utf-8", "ignore").rstrip("\r\n")
                    if not recv:
                        raise socket.error
                    if recv == "asdasdasd":
                        print(len(sockets))
                    #print(recv)
                    workers[i % len(workers)].put( (fileno, recv, ) )

            except socket.error:
                sockets[fileno].close()
                del sockets[fileno]

                client = json.loads(memory.get("server0:client-" + str(fileno)).decode("utf-8"))
                #print(client)

                if client:
                    if "chat" in client:
                        memory.lrem("chat:" + client["chat"], client["__id__"])

                    memory.delete("server0:client-" + str(fileno))

            finally:
                i += 1


if __name__ == "__main__":
    workers = []
    master_pipe, worker_pipe = multiprocessing.Pipe()

    for i in range( max(1, multiprocessing.cpu_count() - 1) ):
        workers.append(multiprocessing.Queue())

        p = multiprocessing.Process(target = Worker, args = (i, workers[-1], worker_pipe, ))
        p.daemon = True
        p.start()

    Master(master_pipe, workers)