Unity处理来自Socket的数据流
Unity Processing Data Stream from Socket
背景
我有一个简单的工作服务器,它从数据收集设备发送二进制文件,套接字连接有效,我的 Unity 客户端和其他客户端(如 matlab 客户端)都可以正确接收数据。
然后 Unity 中的客户端尝试从该服务器接收数据并将每个数据包中的字节分割成 3D 坐标数组。
每一帧应该是 512x424 像素大,所以我需要等到数据填满 512x424 字节并进行分割
问题
Unity在套接字连接和接收阶段运行良好,在处理阶段会卡住(停顿):ProcessFrame((byte[])state.buffer.Clone())
我读过这篇文章post:
Getting UdpClient Receive data back in the Main thread
并相应地更改了我的代码,但问题仍然存在。
我是不是做错了什么?感谢您的帮助:)
代码---客户端
public class SomeClass: MonoBehaviour {
public GameObject sphere;
const int IMGW = 512;
const int IMGH = 424;
const int MAXBODY = 6;
const int NUMJOINT = 25;
const int READ_BUFFER_SIZE = (4 * 3 + 4 * 1 + 4 * 1) * (IMGW * IMGH + MAXBODY * NUMJOINT);
const int PORT_NUM = 20156;
public string response = String.Empty;
private Queue queue;
private System.Object queueLock;
private int bytesRead;
// ManualResetEvent instances signal completion.
private static ManualResetEvent connectDone =
new ManualResetEvent(false);
private static ManualResetEvent sendDone =
new ManualResetEvent(false);
private static ManualResetEvent receiveDone =
new ManualResetEvent(false);
// State object for receiving data from remote device.
public class StateObject
{
// Client socket.
public Socket workSocket = null;
// Receive buffer.
public const int BufferSize = (4 * 3 + 4 * 1 + 4 * 1) * (IMGW * IMGH + MAXBODY * NUMJOINT);
public byte[] buffer = new byte[BufferSize];
}
// We use this to keep tasks needed to run in the main thread
private static readonly Queue<Action> tasks = new Queue<Action>();
// Use this for initialization
void Start () {
queueLock = new object();
queue = new Queue();
this.StartClient();
// Test object
sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere);
sphere.transform.position = new Vector3(0, -1, 10);
}
// Update is called once per frame
void Update () {
this.HandleTasks();
}
void HandleTasks()
{
while (tasks.Count > 0)
{
Action task = null;
lock (tasks)
{
if (tasks.Count > 0)
{
task = tasks.Dequeue();
}
}
task();
}
}
public void QueueOnMainThread(Action task)
{
lock (tasks)
{
tasks.Enqueue(task);
}
}
private void StartClient()
{
try
{
IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
IPEndPoint remoteEP = new IPEndPoint(ipAddress, PORT_NUM);
Socket client = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
// Connect to the remote endpoint.
client.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), client);
connectDone.WaitOne();
Receive(client);
receiveDone.WaitOne();
Console.WriteLine("Response received : {0}", response);
// Release the socket
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch (Exception e)
{
Debug.Log(e.ToString());
}
}
private void ConnectCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket client = (Socket)ar.AsyncState;
// Complete the connection.
client.EndConnect(ar);
// Signal that the connection has been made.
connectDone.Set();
}
catch (Exception e)
{
String error = e.ToString();
Console.WriteLine(e.ToString());
fnDisconnect();
}
}
private void Receive(Socket client)
{
try
{
// Create the state object.
StateObject state = new StateObject();
state.workSocket = client;
bytesRead = 0;
// Begin receiving the data from the remote device.
client.BeginReceive(state.buffer, bytesRead, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback), state);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
private void ReceiveCallback(IAsyncResult ar)
{
try
{
// Retrieve the state object and the client socket
// from the asynchronous state object.
StateObject state = (StateObject)ar.AsyncState;
Socket client = state.workSocket;
// Read data from the remote device.
int numOfBytesRead = client.EndReceive(ar);
if (numOfBytesRead > 0)
{
bytesRead += numOfBytesRead;
if (bytesRead == StateObject.BufferSize)
{
this.QueueOnMainThread(() =>
{
// All the data has arrived; put it in response.
ProcessFrame((byte[])state.buffer.Clone());
});
Receive(client);
}
else {
// Get the rest of the data.
client.BeginReceive(state.buffer, bytesRead, StateObject.BufferSize - bytesRead, 0,
new AsyncCallback(ReceiveCallback), state);
}
}
else
{
receiveDone.Set();
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
代码---数据处理
private void ProcessFrame(byte[] buffer)
{
byte[] bufferCopy = (byte[])buffer.Clone();
double[,,] XYZArray = new double[IMGH, IMGW, 3];
byte[,] DepthArray = new byte[IMGH, IMGW];
byte[,,] RGBArray = new byte[IMGH, IMGW, 3];
for (int i = 0; i < IMGW; i++)
{
for (int j = 0; j < IMGH; j++)
{
int index = (i * IMGW + j) * 20;
//byte[] arr = {bufferCopy[index], bufferCopy[index + 1], bufferCopy[index + 2], bufferCopy[index + 3] };
float v = System.BitConverter.ToSingle(bufferCopy, index);
if (!float.IsInfinity(v) && !float.IsNaN(v))
{
XYZArray[i, j, 0] = v;
}
//arr = new byte[]{bufferCopy[index + 4], bufferCopy[index + 5], bufferCopy[index + 6], bufferCopy[index + 7] };
v = System.BitConverter.ToSingle(bufferCopy, index + 4);
if (!float.IsInfinity(v) && !float.IsNaN(v))
{
XYZArray[i, j, 1] = v;
}
v = System.BitConverter.ToSingle(bufferCopy, index + 8);
if (!float.IsInfinity(v) && !float.IsNaN(v))
{
XYZArray[i, j, 2] = v;
}
// Debug.Log("for loop called");
DepthArray[i, j] = bufferCopy[index + 12];
RGBArray[i, j, 2] = bufferCopy[index + 16]; // B
RGBArray[i, j, 1] = bufferCopy[index + 17]; // G
RGBArray[i, j, 0] = bufferCopy[index + 18]; // R
}
}
this.EnQueue(XYZArray);
}
private void EnQueue(System.Object obj)
{
lock (queueLock)
{
queue.Enqueue(obj);
}
}
private bool DeQueue(System.Object outObj)
{
bool success = false;
lock (queueLock)
{
if (queue.Count > 0)
{
outObj = queue.Dequeue();
success = true;
}
}
return success;
}
public int lengthOfQueue()
{
int count = -1;
lock (queueLock)
{
count = queue.Count;
}
return count;
}
public double[,,] getXYZArray()
{
double[,,] retVal = new double[,,] { };
this.DeQueue(retVal);
return retVal;
}
更新
感谢@Programmer 的建议,我按照 link (s) 他提供的方法获得了一个可用的套接字客户端。
整个代码一团糟。您应该使用 Thread
而不是 Async
来以更少的代码完成该操作。
任何方式,将 byte[] bufferCopy = (byte[])buffer.Clone();
行代码替换为
byte[] bufferCopy = new byte[buffer.Length];
System.Buffer.BlockCopy(buffer, 0, bufferCopy, 0, bufferCopy.Length);
而对于 ProcessFrame((byte[])state.buffer.Clone());
,只需传入数据而不克隆它。所以应该用
代替
ProcessFrame(state.buffer);
假设这是您代码中的唯一问题,这应该可以解决问题。
编辑:
一个完整的Unity TCP服务器代码。将代码移植到 UDP,这应该适合你。
背景
我有一个简单的工作服务器,它从数据收集设备发送二进制文件,套接字连接有效,我的 Unity 客户端和其他客户端(如 matlab 客户端)都可以正确接收数据。
然后 Unity 中的客户端尝试从该服务器接收数据并将每个数据包中的字节分割成 3D 坐标数组。
每一帧应该是 512x424 像素大,所以我需要等到数据填满 512x424 字节并进行分割
问题
Unity在套接字连接和接收阶段运行良好,在处理阶段会卡住(停顿):ProcessFrame((byte[])state.buffer.Clone())
我读过这篇文章post: Getting UdpClient Receive data back in the Main thread 并相应地更改了我的代码,但问题仍然存在。
我是不是做错了什么?感谢您的帮助:)
代码---客户端
public class SomeClass: MonoBehaviour { public GameObject sphere; const int IMGW = 512; const int IMGH = 424; const int MAXBODY = 6; const int NUMJOINT = 25; const int READ_BUFFER_SIZE = (4 * 3 + 4 * 1 + 4 * 1) * (IMGW * IMGH + MAXBODY * NUMJOINT); const int PORT_NUM = 20156; public string response = String.Empty; private Queue queue; private System.Object queueLock; private int bytesRead; // ManualResetEvent instances signal completion. private static ManualResetEvent connectDone = new ManualResetEvent(false); private static ManualResetEvent sendDone = new ManualResetEvent(false); private static ManualResetEvent receiveDone = new ManualResetEvent(false); // State object for receiving data from remote device. public class StateObject { // Client socket. public Socket workSocket = null; // Receive buffer. public const int BufferSize = (4 * 3 + 4 * 1 + 4 * 1) * (IMGW * IMGH + MAXBODY * NUMJOINT); public byte[] buffer = new byte[BufferSize]; } // We use this to keep tasks needed to run in the main thread private static readonly Queue<Action> tasks = new Queue<Action>(); // Use this for initialization void Start () { queueLock = new object(); queue = new Queue(); this.StartClient(); // Test object sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere); sphere.transform.position = new Vector3(0, -1, 10); } // Update is called once per frame void Update () { this.HandleTasks(); } void HandleTasks() { while (tasks.Count > 0) { Action task = null; lock (tasks) { if (tasks.Count > 0) { task = tasks.Dequeue(); } } task(); } } public void QueueOnMainThread(Action task) { lock (tasks) { tasks.Enqueue(task); } } private void StartClient() { try { IPAddress ipAddress = IPAddress.Parse("127.0.0.1"); IPEndPoint remoteEP = new IPEndPoint(ipAddress, PORT_NUM); Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); // Connect to the remote endpoint. client.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), client); connectDone.WaitOne(); Receive(client); receiveDone.WaitOne(); Console.WriteLine("Response received : {0}", response); // Release the socket client.Shutdown(SocketShutdown.Both); client.Close(); } catch (Exception e) { Debug.Log(e.ToString()); } } private void ConnectCallback(IAsyncResult ar) { try { // Retrieve the socket from the state object. Socket client = (Socket)ar.AsyncState; // Complete the connection. client.EndConnect(ar); // Signal that the connection has been made. connectDone.Set(); } catch (Exception e) { String error = e.ToString(); Console.WriteLine(e.ToString()); fnDisconnect(); } } private void Receive(Socket client) { try { // Create the state object. StateObject state = new StateObject(); state.workSocket = client; bytesRead = 0; // Begin receiving the data from the remote device. client.BeginReceive(state.buffer, bytesRead, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state); } catch (Exception e) { Console.WriteLine(e.ToString()); } } private void ReceiveCallback(IAsyncResult ar) { try { // Retrieve the state object and the client socket // from the asynchronous state object. StateObject state = (StateObject)ar.AsyncState; Socket client = state.workSocket; // Read data from the remote device. int numOfBytesRead = client.EndReceive(ar); if (numOfBytesRead > 0) { bytesRead += numOfBytesRead; if (bytesRead == StateObject.BufferSize) { this.QueueOnMainThread(() => { // All the data has arrived; put it in response. ProcessFrame((byte[])state.buffer.Clone()); }); Receive(client); } else { // Get the rest of the data. client.BeginReceive(state.buffer, bytesRead, StateObject.BufferSize - bytesRead, 0, new AsyncCallback(ReceiveCallback), state); } } else { receiveDone.Set(); } } catch (Exception e) { Console.WriteLine(e.ToString()); } }
代码---数据处理
private void ProcessFrame(byte[] buffer) { byte[] bufferCopy = (byte[])buffer.Clone(); double[,,] XYZArray = new double[IMGH, IMGW, 3]; byte[,] DepthArray = new byte[IMGH, IMGW]; byte[,,] RGBArray = new byte[IMGH, IMGW, 3]; for (int i = 0; i < IMGW; i++) { for (int j = 0; j < IMGH; j++) { int index = (i * IMGW + j) * 20; //byte[] arr = {bufferCopy[index], bufferCopy[index + 1], bufferCopy[index + 2], bufferCopy[index + 3] }; float v = System.BitConverter.ToSingle(bufferCopy, index); if (!float.IsInfinity(v) && !float.IsNaN(v)) { XYZArray[i, j, 0] = v; } //arr = new byte[]{bufferCopy[index + 4], bufferCopy[index + 5], bufferCopy[index + 6], bufferCopy[index + 7] }; v = System.BitConverter.ToSingle(bufferCopy, index + 4); if (!float.IsInfinity(v) && !float.IsNaN(v)) { XYZArray[i, j, 1] = v; } v = System.BitConverter.ToSingle(bufferCopy, index + 8); if (!float.IsInfinity(v) && !float.IsNaN(v)) { XYZArray[i, j, 2] = v; } // Debug.Log("for loop called"); DepthArray[i, j] = bufferCopy[index + 12]; RGBArray[i, j, 2] = bufferCopy[index + 16]; // B RGBArray[i, j, 1] = bufferCopy[index + 17]; // G RGBArray[i, j, 0] = bufferCopy[index + 18]; // R } } this.EnQueue(XYZArray); } private void EnQueue(System.Object obj) { lock (queueLock) { queue.Enqueue(obj); } } private bool DeQueue(System.Object outObj) { bool success = false; lock (queueLock) { if (queue.Count > 0) { outObj = queue.Dequeue(); success = true; } } return success; } public int lengthOfQueue() { int count = -1; lock (queueLock) { count = queue.Count; } return count; } public double[,,] getXYZArray() { double[,,] retVal = new double[,,] { }; this.DeQueue(retVal); return retVal; }
更新
感谢@Programmer 的建议,我按照 link (s) 他提供的方法获得了一个可用的套接字客户端。
整个代码一团糟。您应该使用 Thread
而不是 Async
来以更少的代码完成该操作。
任何方式,将 byte[] bufferCopy = (byte[])buffer.Clone();
行代码替换为
byte[] bufferCopy = new byte[buffer.Length];
System.Buffer.BlockCopy(buffer, 0, bufferCopy, 0, bufferCopy.Length);
而对于 ProcessFrame((byte[])state.buffer.Clone());
,只需传入数据而不克隆它。所以应该用
ProcessFrame(state.buffer);
假设这是您代码中的唯一问题,这应该可以解决问题。
编辑: