客户端在调用 Azure 事件中心并面临连接错误时挂起
Client hangs when calling Azure Event Hub and facing connection error
我想将事件消息发送到 Azure 事件中心。我注意到如果我配置错误,我的应用程序就会挂起而不是终止。
我写了一个非常简单的 Java class 尝试将事件消息发送到事件中心。如果我错误地输入了事件中心的终结点,则应用程序会挂起。这真是令人失望。
我有可能误解了一些东西,但我想做的只是发送一个简单的消息,仅此而已。我该怎么做?
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
.setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
.setTransportType(TransportType.AMQP_WEB_SOCKETS)
.setSasKeyName("XXX")
.setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final EventHubClient ehClient =
EventHubClient.createFromConnectionStringSync(
connectionStringBuilder.toString(),
RetryPolicy.getNoRetry(),
scheduledExecutorService
);
ehClient.sendSync(EventData.create("Test Message".getBytes()));
ehClient.closeSync();
scheduledExecutorService.shutdown();
我使用以下依赖项:
compile "com.microsoft.azure:azure-eventhubs:3.2.0"
如有任何帮助,我将不胜感激!
谢谢!
我用Maven创建了一个java项目,然后在pom.xml中添加依赖:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
这是发送事件消息的代码:
package test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class App
{
public static void main( String[] args ) throws Exception
{
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("testbowman")
.setEventHubName("test")
.setSasKeyName("testbowman")
.setSasKey("xxxxxx");
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i < 10; i++) {
String payload = "Message " + Integer.toString(i);
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + ": Send Complete...");
System.out.println("Press Enter to stop.");
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
System.out.println( "Hello World!" );
System.out.println( "!!!!!!!!!!!!!" );
}
}
(我隐藏了 sas 密钥,我想你知道从哪里获得 sas 密钥。:))
终于可以在metrics上看到消息进来了(暂时看不到,需要等一段时间。):
这是官方文档:
编辑:
我认为您挂起的原因是执行程序在出现错误时没有机会关闭。您应该将代码包装在 try finally 中,如下所示 :
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
.setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
.setTransportType(TransportType.AMQP_WEB_SOCKETS)
.setSasKeyName("XXX")
.setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
EventHubClient ehClient = null;
try {
ehClient =
EventHubClient.createFromConnectionStringSync(
connectionStringBuilder.toString(),
RetryPolicy.getNoRetry(),
scheduledExecutorService
);
ehClient.sendSync(EventData.create("Test Message".getBytes()));
}
finally {
if (ehClient != null)
ehClient.closeSync();
scheduledExecutorService.shutdown();
}
N.B。您的代码使用的是 azure-eventhubs 包 (Event Hub v3),如 this tutorial. The latest package azure-messaging-eventhubs (Event Hub v5 using Producer/Consumer pattern) has bit different APIs which is described in this tutorial 中所述。如果是新开发,你应该使用新的 SDK。
import com.azure.messaging.eventhubs.*;
public class Sender {
public static void main(String[] args) {
final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
final String eventHubName = "EVENT HUB NAME";
// create a producer using the namespace connection string and event hub name
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// prepare a batch of events to send to the event hub
EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));
// send the batch of events to the event hub
producer.send(batch);
// close the producer
producer.close();
}
}
进一步说明,还有从 v3 到 v5 的迁移指南 here。
即使使用旧包,我也无法使用 Executors.newScheduledThreadPool(4) 或 Executors.newSingleThreadScheduledExecutor() 在开始时优雅地关闭 Executor 时重现您的挂起问题。如果我错误地提供了错误的连接字符串,它会立即抛出异常:线程“main”中的异常com.microsoft.azure.eventhubs.CommunicationException:发生通信错误。这可能是由于您的连接字符串中的主机名不正确或您的网络连接有问题。
我想将事件消息发送到 Azure 事件中心。我注意到如果我配置错误,我的应用程序就会挂起而不是终止。
我写了一个非常简单的 Java class 尝试将事件消息发送到事件中心。如果我错误地输入了事件中心的终结点,则应用程序会挂起。这真是令人失望。
我有可能误解了一些东西,但我想做的只是发送一个简单的消息,仅此而已。我该怎么做?
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
.setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
.setTransportType(TransportType.AMQP_WEB_SOCKETS)
.setSasKeyName("XXX")
.setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final EventHubClient ehClient =
EventHubClient.createFromConnectionStringSync(
connectionStringBuilder.toString(),
RetryPolicy.getNoRetry(),
scheduledExecutorService
);
ehClient.sendSync(EventData.create("Test Message".getBytes()));
ehClient.closeSync();
scheduledExecutorService.shutdown();
我使用以下依赖项:
compile "com.microsoft.azure:azure-eventhubs:3.2.0"
如有任何帮助,我将不胜感激! 谢谢!
我用Maven创建了一个java项目,然后在pom.xml中添加依赖:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
这是发送事件消息的代码:
package test;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class App
{
public static void main( String[] args ) throws Exception
{
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("testbowman")
.setEventHubName("test")
.setSasKeyName("testbowman")
.setSasKey("xxxxxx");
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i < 10; i++) {
String payload = "Message " + Integer.toString(i);
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + ": Send Complete...");
System.out.println("Press Enter to stop.");
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
System.out.println( "Hello World!" );
System.out.println( "!!!!!!!!!!!!!" );
}
}
(我隐藏了 sas 密钥,我想你知道从哪里获得 sas 密钥。:))
终于可以在metrics上看到消息进来了(暂时看不到,需要等一段时间。):
这是官方文档:
编辑:
我认为您挂起的原因是执行程序在出现错误时没有机会关闭。您应该将代码包装在 try finally 中,如下所示 :
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder();
connectionStringBuilder
.setEndpoint(URI.create("https://XXXXXXXXX.servsssicebus.windows.net"))
.setTransportType(TransportType.AMQP_WEB_SOCKETS)
.setSasKeyName("XXX")
.setSasKey("XXX");
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
EventHubClient ehClient = null;
try {
ehClient =
EventHubClient.createFromConnectionStringSync(
connectionStringBuilder.toString(),
RetryPolicy.getNoRetry(),
scheduledExecutorService
);
ehClient.sendSync(EventData.create("Test Message".getBytes()));
}
finally {
if (ehClient != null)
ehClient.closeSync();
scheduledExecutorService.shutdown();
}
N.B。您的代码使用的是 azure-eventhubs 包 (Event Hub v3),如 this tutorial. The latest package azure-messaging-eventhubs (Event Hub v5 using Producer/Consumer pattern) has bit different APIs which is described in this tutorial 中所述。如果是新开发,你应该使用新的 SDK。
import com.azure.messaging.eventhubs.*;
public class Sender {
public static void main(String[] args) {
final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
final String eventHubName = "EVENT HUB NAME";
// create a producer using the namespace connection string and event hub name
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// prepare a batch of events to send to the event hub
EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));
// send the batch of events to the event hub
producer.send(batch);
// close the producer
producer.close();
}
}
进一步说明,还有从 v3 到 v5 的迁移指南 here。
即使使用旧包,我也无法使用 Executors.newScheduledThreadPool(4) 或 Executors.newSingleThreadScheduledExecutor() 在开始时优雅地关闭 Executor 时重现您的挂起问题。如果我错误地提供了错误的连接字符串,它会立即抛出异常:线程“main”中的异常com.microsoft.azure.eventhubs.CommunicationException:发生通信错误。这可能是由于您的连接字符串中的主机名不正确或您的网络连接有问题。