在 Dataflow 中手动发送 PubSub 消息

Sending PubSub message manually in Dataflow

如何在 Dataflow 中手动发送 PubSub 消息(也就是说,不使用 PubsubIO)?

导入(通过 Maven)google-cloud-dataflow-java-sdk-all 2.5.0 已经导入了 com.google.pubsub.v1 的一个版本,我无法找到一种简单的方法将消息发送到 Pubsub 主题(这个版本没有,因为instance,允许操作Publisher个instances,这是官方文档描述的方式。

您会考虑使用 PubsubUnboundedSink 吗?快速示例:

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;


public class PubsubTest {

    public static void main(String[] args) {

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                        .as(DataflowPipelineOptions.class); 

        // writes message to "output_topic"
        TopicPath topic = PubsubClient.topicPathFromName(options.getProject(), "output_topic");

        Pipeline p = Pipeline.create(options);

        p
        .apply("input string", Create.of("This is just a message"))
        .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(new PubsubMessage(c.element().getBytes(), null));          
            }
        }))
        .apply("write to topic", new PubsubUnboundedSink(
            PubsubJsonClient.FACTORY,
            StaticValueProvider.of(topic), // topic
            "timestamp", // timestamp attribute
            "id", // ID attribute
            5 // number of shards
        )); 

        p.run();
    }
}

这是我发现的一种浏览方式 https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/blob/master/dataflow/src/main/java/com/google/cloud/dataflow/examples/StockInjector.java:

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
public class PubsubManager {
    private static final Logger logger = LoggerFactory.getLogger(PubsubManager.class);
    private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
    private static final Pubsub pubsub = createPubsubClient();

    public static class RetryHttpInitializerWrapper implements HttpRequestInitializer {

        // Intercepts the request for filling in the "Authorization"
        // header field, as well as recovering from certain unsuccessful
        // error codes wherein the Credential must refresh its token for a
        // retry.
        private final GoogleCredential wrappedCredential;

        // A sleeper; you can replace it with a mock in your test.
        private final Sleeper sleeper;

        private RetryHttpInitializerWrapper(GoogleCredential wrappedCredential) {
            this(wrappedCredential, Sleeper.DEFAULT);
        }

        // Use only for testing.
        RetryHttpInitializerWrapper(
                GoogleCredential wrappedCredential, Sleeper sleeper) {
            this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
            this.sleeper = sleeper;
        }

        @Override
        public void initialize(HttpRequest request) {
            final HttpUnsuccessfulResponseHandler backoffHandler =
                    new HttpBackOffUnsuccessfulResponseHandler(
                            new ExponentialBackOff())
                            .setSleeper(sleeper);
            request.setInterceptor(wrappedCredential);
            request.setUnsuccessfulResponseHandler(
                    new HttpUnsuccessfulResponseHandler() {
                        @Override
                        public boolean handleResponse(HttpRequest request,
                                                      HttpResponse response,
                                                      boolean supportsRetry)
                                throws IOException {
                            if (wrappedCredential.handleResponse(request,
                                    response,
                                    supportsRetry)) {
                                // If credential decides it can handle it, the
                                // return code or message indicated something
                                // specific to authentication, and no backoff is
                                // desired.
                                return true;
                            } else if (backoffHandler.handleResponse(request,
                                    response,
                                    supportsRetry)) {
                                // Otherwise, we defer to the judgement of our
                                // internal backoff handler.
                                logger.info("Retrying " + request.getUrl());
                                return true;
                            } else {
                                return false;
                            }
                        }
                    });
            request.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(
                    new ExponentialBackOff()).setSleeper(sleeper));
        }
    }

    /**
     * Creates a Cloud Pub/Sub client.
     */
    private static Pubsub createPubsubClient() {
        try {
            HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
            GoogleCredential credential = GoogleCredential.getApplicationDefault();
            HttpRequestInitializer initializer =
                    new RetryHttpInitializerWrapper(credential);
            return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build();
        } catch (IOException | GeneralSecurityException e) {
            logger.error("Could not create Pubsub client: " + e);
        }
        return null;
    }

    /**
     * Publishes the given message to a Cloud Pub/Sub topic.
     */
    public static void publishMessage(String message, String outputTopic) {
        int maxLogMessageLength = 200;
        if (message.length() < maxLogMessageLength) {
            maxLogMessageLength = message.length();
        }
        logger.info("Received ...." + message.substring(0, maxLogMessageLength));

        // Publish message to Pubsub.
        PubsubMessage pubsubMessage = new PubsubMessage();
        pubsubMessage.encodeData(message.getBytes());

        PublishRequest publishRequest = new PublishRequest();
        publishRequest.setMessages(Collections.singletonList(pubsubMessage));
        try {
            pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
        } catch (java.io.IOException e) {
            logger.error("Stuff happened in pubsub: " + e);
        }
    }
}

您可以使用 PubsubIO writeMessages 方法发送 pubsub 消息

数据流管道步骤

Pipeline p = Pipeline.create(options);
   p.apply("Transformer1", ParDo.of(new Fn.method1()))
    .apply("Transformer2", ParDo.of(new Fn.method2()))
    .apply("PubsubMessageSend", PubsubIO.writeMessages().to(PubSubConfig.getTopic(options.getProject(), options.getpubsubTopic ())));

定义要在 PipeLineOptions 中发送发布订阅消息的项目名称和 pubsubTopic