通过 websocket 发送的文件在收到时变得太大

File sent trough websocket becomes too large when received

我正在尝试在 Python 中使用 asyncio 学习 websockets。我已经实现了一个 websocket 服务器,它接收二进制数据并输出到另一台计算机。

问题是当数据到达另一台计算机时,生成的文件非常大。对于小文件(如 2 行等的 .txt 文件),它运行良好,但对于大文件(大约 5 mb 及以上),接收方计算机中的结果文件为 4gb。

我找不到造成这种情况的原因。无论我做什么,发送方文件大小和接收方文件大小永远不匹配。

一些代码:

FileManager.py

class Manager():
    BUFFER_SIZE = 8092
    file = None
    filesize = None
    filename = None
    received_file = bytearray()
    sent = 0
    lock = asyncio.Lock()
    secret = None
    ws = None

    def __init__(self,  secret=None, ws: websockets.WebSocketServerProtocol = None):
        self.ws = ws
        self.secret = secret

    def open_file(self, filename, mode):
        self.file = open(filename, mode)

    def close_file(self):
        self.file.close()

    async def chunk_sender(self):
        async with self.lock:
            self.file.seek(self.sent)
            bytes_read = self.file.read(self.BUFFER_SIZE)
            await self.ws.send(json.dumps({
                "cmd": "send",
                "key": self.secret,
                "data": bytes_read.decode("utf-8")
            }))
            self.sent += self.BUFFER_SIZE

    async def chunk_receiver(self, binary):
        async with self.lock:
            self.received_file += binary
            self.file.write(self.received_file)
            perc = ((len(self.received_file) * 100)/self.filesize)
            print("\rDownloading file: " + colored(str(round(perc, 2)) + "%", "magenta"), end='', flush=True)


    async def start_sending(self):
        self.open_file(self.filename, "rb")
        spawn = math.ceil(self.filesize / self.BUFFER_SIZE)
        tasks = []
        for _ in range(spawn):
            tasks.append(self.chunk_sender())

        pbar = tqdm.tqdm(total=len(tasks), leave=True, mininterval=0)

        for process in asyncio.as_completed(tasks):
            value = await process
            pbar.set_description(value)
            pbar.update()

ClientManager.py

import websockets
import json
from termcolor import colored
from classes import File


class Manager:
    SERVER_URL = None
    filename = None
    filesize = 0
    secret = None
    FileManager = File.Manager()

    def __init__(self, SERVER_URL, filename, filesize, secret):
        self.SERVER_URL = SERVER_URL
        self.filename = filename
        self.filesize = filesize
        self.secret = secret

        # Initialize FileManager
        self.FileManager.secret = self.secret
        self.FileManager.filesize = self.filesize
        self.FileManager.filename = self.filename

    async def start_sender(self):
        async with websockets.connect(self.SERVER_URL) as ws:
            self.FileManager.ws = ws
            await ws.send(json.dumps({"cmd": "sender_init", "key": self.secret}))
            print("Now in the receiver computer", end=" "), print(
                colored("sendpai " + self.secret, "magenta"))
            while True:
                message = await ws.recv()
                deserialized = json.loads(message)
                cmd = deserialized["cmd"]
                if cmd == "receiver_request":
                    await self.FileManager.start_sending()
                elif cmd == "receiver_init":
                    await ws.send(json.dumps({"cmd": "file_details", "key": self.secret, "filename": self.filename, "filesize": self.filesize}))

    async def start_receiver(self):
        async with websockets.connect(self.SERVER_URL) as ws:
            self.FileManager.ws = ws
            await ws.send(json.dumps({"cmd": "receiver_init", "key": self.secret}))
            while True:
                message = await ws.recv()
                deserialized = json.loads(message)
                if "cmd" in deserialized:
                    cmd = deserialized["cmd"]
                    if cmd == "send":
                        if "data" in deserialized:
                            binary_chunk = bytes(
                                deserialized["data"], encoding="utf-8")
                            await self.FileManager.chunk_receiver(binary_chunk)
                    elif cmd == "file_details":
                        self.FileManager.filename = deserialized["filename"]
                        self.FileManager.filesize = deserialized["filesize"]
                        self.FileManager.open_file("hello", "wb")
                        await ws.send(json.dumps({"cmd": "receiver_request", "key": self.secret}))
                        print("[The file is about to be downloaded]")
                        print(
                            "filename: " + colored(str(self.FileManager.filename), "green"), end=" ")
                        print(
                            "filesize: " + colored(str(self.FileManager.filesize / 1000) + "mb", "yellow"))


Server.py

class Server():
    clients = []
    clients_lock = threading.Lock()

    async def register(self, ws: websockets.WebSocketServerProtocol, key, who) -> None:
        with self.clients_lock:
            self.clients.append({"key": key, "ws": ws, "who": who})
        logging.info(who + f' {ws.remote_address[0]} connects')

    async def unregister(self, ws: websockets.WebSocketServerProtocol) -> None:
        with self.clients_lock:
            for client in self.clients:
                if client["ws"] == ws:
                    del client
        logging.info(f'{ws.remote_address[0]} connects')

    async def init_event(self, ws: websockets.WebSocketServerProtocol, key: str, who:str) -> None:
        await self.register(ws, key, who)
        logging.info(f'{ws.remote_address[0]} with key f{key}')

    async def receiver_request_event(self, ws: websockets.WebSocketServerProtocol, key: str) -> None:
        await self.register(ws, key, "receiver")
        for client in self.clients:
            if client["key"] == key:
                await client["ws"].send(json.dumps({"cmd": "receiver_request"}))

    async def send_to_receiver(self, key, message):
        for client in self.clients:
            if(client["key"] == key and client["who"] == "receiver"):
                await client["ws"].send(message)

    async def send_to_sender(self, key, message):
        for client in self.clients:
            if(client["key"] == key and client["who"] == "sender"):
                await client["ws"].send(message)

    async def ws_handler(self, ws: websockets.WebSocketServerProtocol, uri: str):
        try:    
            async for message in ws:
                deserialized = json.loads(message)
                cmd = deserialized["cmd"]
                key = deserialized["key"]
                if cmd == "sender_init":
                    await self.init_event(ws, key, "sender")
                elif cmd == "receiver_request":
                    await self.receiver_request_event(ws, key)
                elif cmd == "send":
                    await self.send_to_receiver(key, message)
                elif cmd == "receiver_init":
                    await self.init_event(ws, key, "receiver")
                    await self.send_to_sender(key, message)
                elif cmd == "file_details":
                    await self.send_to_receiver(key, message)
        except websockets.exceptions.ConnectionClosed as e:
            logging.info("Connection closed")

我试图调试我的代码以搜索:

我注意到的一件事是我需要锁定 chunk_sender 函数,因为当协程到达时我多次从同一个指针读取数据,这改进了一些事情但仍然存在问题。

提前致谢。

没关系,2 个错误:

客户端在服务器事件中被添加到我的列表中两次:

self.receiver_request_event(ws, key)

self.init_event(ws, key, "receiver")

所以,当发送 websocket 消息时,我收到了两次。

而且,还有这个,我每次都应该写给定的二进制文件而不是已经发送的字节:

    async def chunk_receiver(self, binary):
        async with self.lock:
            self.received_file += binary
            self.file.write(self.received_file)

...