从 Spring 控制器执行 Google Cloud Dataflow 管道

Execute Google Cloud Dataflow pipeline from Spring controller

我将如何使用 Spring 执行到 Google Cloud Dataflow 的 Apache Beam 管道?这个问题类似于 Running Apache Beam pipeline in Spring Boot project on Google Data Flow,但这个问题更多地关注从 Spring 控制器启动管道,而不是从 CommandLineRunner。

@RestController
@RequestMapping("/task/import-csv-file")
public class ImportCsvController {
    @PostMapping("/process-csv-file")
    private ResponseEntity<Void> processCsvFile(
            @RequestParam String gcsFileName,
            @RequestParam String bucketName
    ) {
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

        options.setProject("same-project-as-this-app-engine-instance");
        options.setStagingLocation("gs://" + bucketName + "/binaries");
        options.setRunner(DataflowRunner.class);
        options.setJobname("process-csv");

        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("ReadFile", TextIO.read().from("gs://" + bucketName + "/" + gcsFileName));
        // ... apply some more transforms here, which will eventually 
        // write csv rows as Google Datastore entities ...
        pipeline.run().waitUntilFinish();
        return ResponseEntity.ok().build();
    }
}

我是运行这个控制器,使用Google Cloud Tasks,使用下面的代码:

@Service
public class TaskQueueService {
    private Queue csvImportsQueue;

    public TaskQueueService() {
        this.csvImportsQueue = QueueFactory.getQueue("csv-import-queue");
    }

    public void queueImportCsvFile(String gcsFileName, String bucketName) {
        String url = "/task/import-csv-file/process-csv-file";
        TaskOptions taskOptions = TaskOptions.Builder
                .withUrl(url)
                .method(POST)
                .param("gcsFileName", gcsFileName)
                .param("bucketName", bucketName);
        queue.add(ofy().getTransaction(), taskOptions);
    }
}

来自 Google Cloud Logging,我得到了这个错误

java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)

在堆栈跟踪中,这条错误消息看起来很有用:

Caused by: java.lang.IllegalArgumentException: Missing required value for [public abstract java.lang.String org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.getProject(), "Project id. Required when running a Dataflow in the cloud. See https://cloud.google.com/storage/docs/projects for further details."]. 

但是正如您在上面看到的,我确实使用 options.setProject("same-project-as-this-app-engine-instance");

行设置了项目

编辑:我发现了一个不同的堆栈跟踪,它有不同的错误消息。这是完整的堆栈跟踪。

java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
    at com.example.application.controllers.tasks.ImportCsvController.processCsvFile(ImportCsvController.java:64)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:877)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
    at com.example.application.filters.SwitchUserProfileFilter.doFilter(SwitchUserProfileFilter.java:127)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLLogoutFilter.processLogout(SAMLLogoutFilter.java:168)
    at org.springframework.security.saml.SAMLLogoutFilter.doFilter(SAMLLogoutFilter.java:110)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLDiscovery.doFilter(SAMLDiscovery.java:137)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLLogoutProcessingFilter.processLogout(SAMLLogoutProcessingFilter.java:209)
    at org.springframework.security.saml.SAMLLogoutProcessingFilter.doFilter(SAMLLogoutProcessingFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLEntryPoint.doFilter(SAMLEntryPoint.java:102)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.metadata.MetadataDisplayFilter.doFilter(MetadataDisplayFilter.java:84)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.metadata.MetadataGeneratorFilter.doFilter(MetadataGeneratorFilter.java:87)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:66)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at com.example.application.filters.CustomDomainFilter.doFilterInternal(CustomDomainFilter.java:43)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:117)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.access[=16=]0(ErrorPageFilter.java:61)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilterInternal(ErrorPageFilter.java:92)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:110)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:119)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
    at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.eclipse.jetty.server.Server.handle(Server.java:539)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:213)
    at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
    at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:757)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:720)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:690)
    at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:882)
    at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:270)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException: null
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
    ... 123 common frames omitted
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    at com.sun.proxy.$Proxy164.getGcsUtil(Unknown Source)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.tryCreateDefaultBucket(GcpOptions.java:354)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:300)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:288)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    at com.sun.proxy.$Proxy149.getGcpTempLocation(Unknown Source)
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:249)
    ... 128 common frames omitted

编辑 2:我在我的一个源文件中键入了这两个导入:

import com.google.common.cache.CacheBuilder;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;

然后我按 Cmd+Click 进入他们在 Intellij 中的实现,我发现方法重载不存在(埋在上面的长堆栈跟踪中)。

~/.gradle/caches/modules-2/files-2.1/com.google.cloud.bigdataoss/gcsio/2.0.0/ba86cba5b74f7ded14feb682cc81ece7724573a7/gcsio-2.0.0-sources.jar!/com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl.java

private final LoadingCache<String, Boolean> autoBuckets =
    CacheBuilder.newBuilder()
        .expireAfterWrite(Duration.ofHours(1))
        .build(
            new CacheLoader<String, Boolean>() {
              final List<String> iamPermissions = ImmutableList.of("storage.buckets.get");

              @Override
              public Boolean load(String bucketName) throws Exception {
                try {
                  gcs.buckets()
                      .testIamPermissions(bucketName, iamPermissions)
                      .executeUnparsed()
                      .disconnect();
                } catch (IOException e) {
                  return errorExtractor.userProjectMissing(e);
                }
                return false;
              }
            });

~/.gradle/caches/modules-2/files-2.1/com.google.guava/guava/29.0-android/63f9bc5fbf2ebfe6b17683f8eac8419588295a28/guava-29.0-android-sources.jar!/com/google/common/cache/CacheBuilder.java

public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
  checkState(
      expireAfterWriteNanos == UNSET_INT,
      "expireAfterWrite was already set to %s ns",
      expireAfterWriteNanos);
  checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
  this.expireAfterWriteNanos = unit.toNanos(duration);
  return this;
}

编辑 3:这是我使用的 Apache Beam 版本:

dependencies {
    compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.20.0'
    compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.20.0'
}

要解决此问题,您应该使用非Android Guava 版本 - 29.0-jre,因为这是 GCSIO 库 depends on.

的 Guava 版本