原创

Spring Cloud 3.x 集成 BigQuery

1.原理

Google BigQuery 是一种高性能、可应用于大数据分析的公主云数据库服务。Spring Cloud 提供了完善的工具和核心功能,可以进行泛动分布应用构建。通过集成 Spring Cloud GCP,应用可以便捷地使用 Google 云服务,如 BigQuery。 运行原理如下:
  1. Spring Cloud GCP 提供对 Google Cloud SDK 和 REST API 的封装,通过自定义配置简化了通信流程。
  2. 通过 BigQueryTemplate,应用可以实现数据提交、查询和分析。
  3. 使用 Spring 框架的常规构件,如信息通道和任务调度,应用可以进行规模化数据处理。

2.应用场景

  1. 大数据分析: 选择 BigQuery 进行大量数据的高性能分析,完善商业准备和内容提报。
  2. ETL 操作: 连接多种数据源,通过规则创建云数据分析的数据统一。
  3. BI 图表: 使用 BigQuery 提供的高速查询功能,支持 BI 平台实现动态绘图和数据分析。
  4. 可视化报表: 在进行精简数据计算后,展示对外分析结果。

3.环境创建

BigQuery 信息中心,为自己创建一个 BigQuery 数据集。在资源面板下,点击您的项目 ID,然后点击项目下的创建数据集。 bigquery-dashboard

4.代码实现

1. 配置环境

1.1 创建项目

使用 Spring Initializr 创建 Spring Boot 项目,选择以下依赖:
  • Spring Web
  • Spring Boot Actuator
  • Spring Cloud GCP

1.2 加入依赖

pom.xml 中添加以下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-cloud-gcp</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-cloud-gcp-bigquery-sample</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-starter-bigquery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>

        <!-- Test-related dependencies. -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

2. 配置文件

2.1 application.properties

spring.cloud.gcp.bigquery.dataset-name=test_dataset
spring.cloud.gcp.bigquery.project-id=feisty-truth-447013-m7
spring.cloud.gcp.bigquery.credentials.location=file:/path-to-key/keyfile.json
举例:请将 path-to-key/keyfile.json 替换为你的服务账户私钥文件路径。

2.2 IAM 访问权限

确保对应的服务账户具备以下角色:
  • BigQuery Data Viewer
  • BigQuery Job User
使用以下指令加入 IAM 访问权限:
gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:YOUR_SERVICE_ACCOUNT_EMAIL" \
    --role="roles/bigquery.jobUser"
替换 PROJECT_IDYOUR_SERVICE_ACCOUNT_EMAIL 为你实际的项目 ID 和服务账户邮箱地址。

3. 实现逻辑

3.1 创建接口

将文件上传至 BigQuery
/*
 * Copyright 2017-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.et;

import com.et.BigQuerySampleConfiguration.BigQueryFileGateway;
import com.google.cloud.bigquery.*;
import com.google.cloud.spring.bigquery.core.BigQueryTemplate;
import com.google.cloud.spring.bigquery.core.WriteApiResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.ui.ModelMap;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.ModelAndView;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** Provides REST endpoint allowing you to load data files to BigQuery using Spring Integration. */
@Controller
public class WebController {

  private final BigQueryFileGateway bigQueryFileGateway;

  private final BigQueryTemplate bigQueryTemplate;

  private static final String DATASET_NAME = "datasetName";

  @Value("${spring.cloud.gcp.bigquery.datasetName}")
  private String datasetName;

  public WebController(BigQueryFileGateway bigQueryFileGateway,
      BigQueryTemplate bigQueryTemplate) {
    this.bigQueryFileGateway = bigQueryFileGateway;
    this.bigQueryTemplate = bigQueryTemplate;
  }

  @GetMapping("/")
  public ModelAndView renderIndex(ModelMap map) {
    map.put(DATASET_NAME, this.datasetName);
    return new ModelAndView("index.html", map);
  }

  @GetMapping("/write-api-json-upload")
  public ModelAndView renderUploadJson(ModelMap map) {
    map.put(DATASET_NAME, this.datasetName);
    return new ModelAndView("upload-json.html", map);
  }

  /**
   * Handles a file upload using {@link BigQueryTemplate}.
   *
   * @param file the JSON file to upload to BigQuery
   * @param tableName name of the table to load data into
   * @return ModelAndView of the response the send back to users
   * @throws IOException if the file is unable to be loaded.
   */
  @PostMapping("/uploadJsonFile")
  public ModelAndView handleJsonFileUpload(
      @RequestParam("file") MultipartFile file,
      @RequestParam("tableName") String tableName,
      @RequestParam(name = "createTable", required = false) String createDefaultTable)
      throws IOException {
    CompletableFuture<WriteApiResponse> writeApiRes;
    if (createDefaultTable != null
        && createDefaultTable.equals("createTable")) { // create the default table
      writeApiRes =
          this.bigQueryTemplate.writeJsonStream(
              tableName, file.getInputStream(), getDefaultSchema());
    } else { // we are expecting the table to be already existing
      writeApiRes = this.bigQueryTemplate.writeJsonStream(tableName, file.getInputStream());
    }
    return getWriteApiResponse(writeApiRes, tableName);
  }

  private Schema getDefaultSchema() {
    return Schema.of(
        Field.of("CompanyName", StandardSQLTypeName.STRING),
        Field.of("Description", StandardSQLTypeName.STRING),
        Field.of("SerialNumber", StandardSQLTypeName.NUMERIC),
        Field.of("Leave", StandardSQLTypeName.NUMERIC),
        Field.of("EmpName", StandardSQLTypeName.STRING));
  }

  /**
   * Handles JSON data upload using using {@link BigQueryTemplate}.
   *
   * @param jsonRows the String JSON data to upload to BigQuery
   * @param tableName name of the table to load data into
   * @return ModelAndView of the response the send back to users
   */
  @PostMapping("/uploadJsonText")
  public ModelAndView handleJsonTextUpload(
      @RequestParam("jsonRows") String jsonRows,
      @RequestParam("tableName") String tableName,
      @RequestParam(name = "createTable", required = false) String createDefaultTable) {
    CompletableFuture<WriteApiResponse> writeApiRes;
    if (createDefaultTable != null
        && createDefaultTable.equals("createTable")) { // create the default table

      writeApiRes =
          this.bigQueryTemplate.writeJsonStream(
              tableName, new ByteArrayInputStream(jsonRows.getBytes()), getDefaultSchema());
    } else { // we are expecting the table to be already existing
      writeApiRes =
          this.bigQueryTemplate.writeJsonStream(
              tableName, new ByteArrayInputStream(jsonRows.getBytes()));
    }
    return getWriteApiResponse(writeApiRes, tableName);
  }

  private ModelAndView getWriteApiResponse(
      CompletableFuture<WriteApiResponse> writeApiFuture, String tableName) {
    String message = null;
    try {
      WriteApiResponse apiResponse = writeApiFuture.get();
      if (apiResponse.isSuccessful()) {
        message = "Successfully loaded data to " + tableName;
      } else if (apiResponse.getErrors() != null && !apiResponse.getErrors().isEmpty()) {
        message =
            String.format(
                "Error occurred while loading the file, printing first error %s. Use WriteApiResponse.getErrors() to get the complete list of errors",
                apiResponse.getErrors().get(0).getErrorMessage());
      }

    } catch (Exception e) {
      e.printStackTrace();
      message = "Error: " + e.getMessage();
    }
    return new ModelAndView("upload-json.html")
        .addObject(DATASET_NAME, this.datasetName)
        .addObject("message", message);
  }

  /**
   * Handles a file upload using {@link BigQueryTemplate}.
   *
   * @param file the CSV file to upload to BigQuery
   * @param tableName name of the table to load data into
   * @return ModelAndView of the response to send back to users
   * @throws IOException if the file is unable to be loaded.
   */
  @PostMapping("/uploadFile")
  public ModelAndView handleFileUpload(
      @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
      throws IOException {

    CompletableFuture<Job> loadJob =
        this.bigQueryTemplate.writeDataToTable(
            tableName, file.getInputStream(), FormatOptions.csv());

    return getResponse(loadJob, tableName);
  }

  /**
   * Handles CSV data upload using Spring Integration {@link BigQueryFileGateway}.
   *
   * @param csvData the String CSV data to upload to BigQuery
   * @param tableName name of the table to load data into
   * @return ModelAndView of the response the send back to users
   */
  @PostMapping("/uploadCsvText")
  public ModelAndView handleCsvTextUpload(
      @RequestParam("csvText") String csvData, @RequestParam("tableName") String tableName) {

    CompletableFuture<Job> loadJob =
        this.bigQueryFileGateway.writeToBigQueryTable(csvData.getBytes(), tableName);

    return getResponse(loadJob, tableName);
  }

  private ModelAndView getResponse(CompletableFuture<Job> loadJob, String tableName) {
    String message;
    try {
      Job job = loadJob.get();
      message = "Successfully loaded data file to " + tableName;
    } catch (Exception e) {
      e.printStackTrace();
      message = "Error: " + e.getMessage();
    }

    return new ModelAndView("index")
        .addObject(DATASET_NAME, this.datasetName)
        .addObject("message", message);
  }
}

5.测试

在google cloud shell里面运行代码 运行 $ mvn spring-boot:run 命令。 cloud-shell 单击 Cloud Shell 中的 Web Preview 按钮以在端口 8080 上预览应用,并尝试将一些数据加载到数据集下的 BigQuery 表中。该应用程序接受 CSV 文件上传或输入到文本区域的 CSV 数据。如果 BigQuery 数据集下尚不存在该表,则会为您创建该表。 8080 查看导入结果 test-dataset

6.引用

 
正文到此结束
Loading...