原创

Spring Boot集成starrocks快速入门Demo

1.什么是starrocks?

StarRocks 是新一代极速全场景 MPP (Massively Parallel Processing) 数据库。StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。用户无需经过复杂的预处理,就可以用 StarRocks 来支持多种数据分析场景的极速分析。 StarRocks 架构简洁,采用了全面向量化引擎,并配备全新设计的 CBO (Cost Based Optimizer) 优化器,查询速度(尤其是多表关联查询)远超同类产品。 StarRocks 能很好地支持实时数据分析,并能实现对实时更新数据的高效查询。StarRocks 还支持现代化物化视图,进一步加速查询。 使用 StarRocks,用户可以灵活构建包括大宽表、星型模型、雪花模型在内的各类模型。 StarRocks 兼容 MySQL 协议,支持标准 SQL 语法,易于对接使用,全系统无外部依赖,高可用,易于运维管理。StarRocks 还兼容多种主流 BI 产品,包括 Tableau、Power BI、FineBI 和 Smartbi。 StarRocks 是 Linux 基金会项目,采用 Apache 2.0 许可证,可在 StarRocks GitHub 存储库中找到(请参阅 StarRocks 许可证)。StarRocks(i)链接或调用第三方软件库中的函数,其许可证可在 licenses-binary 文件夹中找到;和(ii)包含第三方软件代码,其许可证可在 licenses 文件夹中找到。

适用场景

StarRocks 可以满足企业级用户的多种分析需求,包括 OLAP (Online Analytical Processing) 多维分析、定制报表、实时数据分析和 Ad-hoc 数据分析等。

OLAP 多维分析

利用 StarRocks 的 MPP 框架和向量化执行引擎,用户可以灵活的选择雪花模型,星型模型,宽表模型或者预聚合模型。适用于灵活配置的多维分析报表,业务场景包括:
  • 用户行为分析
  • 用户画像、标签分析、圈人
  • 高维业务指标报表
  • 自助式报表平台
  • 业务问题探查分析
  • 跨主题业务分析
  • 财务报表
  • 系统监控分析

实时数据仓库

StarRocks 设计和实现了主键表,能够实时更新数据并极速查询,可以秒级同步 TP (Transaction Processing) 数据库的变化,构建实时数仓,业务场景包括:
  • 电商大促数据分析
  • 物流行业的运单分析
  • 金融行业绩效分析、指标计算
  • 直播质量分析
  • 广告投放分析
  • 管理驾驶舱
  • 探针分析APM(Application Performance Management)

高并发查询

StarRocks 通过良好的数据分布特性,灵活的索引以及物化视图等特性,可以解决面向用户侧的分析场景,业务场景包括:
  • 广告主报表分析
  • 零售行业渠道人员分析
  • SaaS 行业面向用户分析报表
  • Dashboard 多页面分析

统一分析

  • 通过使用一套系统解决多维分析、高并发查询、预计算、实时分析查询等场景,降低系统复杂度和多技术栈开发与维护成本。
  • 使用 StarRocks 统一管理数据湖和数据仓库,将高并发和实时性要求很高的业务放在 StarRocks 中分析,也可以使用 External Catalog 和外部表进行数据湖上的分析。

存算一体架构

本地存储为实时查询提供了更低的查询延迟。 作为典型的大规模并行处理 (MPP) 数据库,StarRocks 支持存算一体架构。在这种架构中,BE 负责数据存储和计算。直接访问 BE 本地数据允许本地计算,避免了数据传输和复制,从而提供超快的查询和分析性能。该架构支持多副本数据存储,增强了集群处理高并发查询的能力并确保数据可靠性。非常适合追求最佳查询性能的场景。   shared-data-1fac1b5ab7d46bf34f67c93ecc8e6c28

2.环境搭建

采用docker搭建最简单的测试环境
docker run -p 9030:9030 -p 8030:8030 -p 8040:8040 -itd --name quickstart starrocks/allin1-ubuntu

333

3.代码工程

实验目的

  1. 测试mysql创建 修改 插入删除数据
  2. 用streamload导入数据

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>starrocks</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.3</version>
       </dependency>
    </dependencies>

</project>

mysql连接

/**
Copyright (c) 2021 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
      "License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing,
  software distributed under the License is distributed on an
  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  KIND, either express or implied.  See the License for the
  specific language governing permissions and limitations
  under the License.
**/

package com.et.starrocks.mysql;

import java.sql.*;

public class MysqlClient {

    public static void main(String[] args) {
        String host = "172.30.17.1";
        //query_port in fe.conf
        String port = "9030";
        String user = "root";
        //password is empty by default
        String password = "";

        //connect to starrocks
        Connection conn = null;
        try {
            conn = getConn(host, port, user, password, "");
        } catch (Exception e) {
            System.out.println("connect to starrocks failed");
            e.printStackTrace();
            return;
        }
        System.out.println("connect to starrocks successfully");

        //create statement
        Statement stmt = null;
        try {
            stmt = conn.createStatement();
        } catch (SQLException e) {
            System.out.println("create statement failed");
            e.printStackTrace();
            closeConn(conn);
            return;
        }
        System.out.println("create statement successfully");

        //create database
        try {
            stmt.execute("CREATE DATABASE IF NOT EXISTS db_test");
        } catch (SQLException e) {
            System.out.println("create database failed");
            e.printStackTrace();
            closeStmt(stmt);
            closeConn(conn);
            return;
        }
        System.out.println("create database successfully");

        //set db context
        try {
            stmt.execute("USE db_test");
        } catch (SQLException e) {
            System.out.println("set db context failed");
            e.printStackTrace();
            closeStmt(stmt);
            closeConn(conn);
            return;
        }
        System.out.println("set db context successfully");

        //create table
        try {
            stmt.execute("CREATE TABLE IF NOT EXISTS table_test(siteid INT, citycode SMALLINT, pv BIGINT SUM) " +
                    "AGGREGATE KEY(siteid, citycode) " +
                    "DISTRIBUTED BY HASH(siteid) BUCKETS 10 " +
                    "PROPERTIES(\"replication_num\" = \"1\")");
        } catch (Exception e) {
            System.out.println("create table failed");
            e.printStackTrace();
            closeStmt(stmt);
            closeConn(conn);
            return;
        }
        System.out.println("create table successfully");

        //insert data
        try {
            stmt.execute("INSERT INTO table_test values(1, 2, 3), (4, 5, 6), (1, 2, 4)");
        } catch (Exception e) {
            System.out.println("insert data failed");
            e.printStackTrace();
            closeStmt(stmt);
            closeConn(conn);
            return;
        }
        System.out.println("insert data successfully");

        //query data
        try {
            ResultSet result = stmt.executeQuery("SELECT * FROM table_test");
            System.out.println("data queried is :");
            while (result.next()) {
                int siteid = result.getInt("siteid");
                int citycode = result.getInt("citycode");
                int pv = result.getInt("pv");
                System.out.println("\t" + siteid + "\t" + citycode + "\t" + pv);
            }
        } catch (Exception e) {
            System.out.println("query data failed");
            e.printStackTrace();
            closeStmt(stmt);
            closeConn(conn);
            return;
        }

        //drop database
     /*   try {
            stmt.execute("DROP DATABASE IF EXISTS db_test");
        } catch (Exception e) {
            System.out.println("drop database failed");
            e.printStackTrace();
            closeStmt(stmt);
            closeConn(conn);
            return;
        }*/
        System.out.println("drop database successfully");
        closeStmt(stmt);
        closeConn(conn);
    }

    public static Connection getConn(String host, String port, String user, String password, String database) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        String url = "jdbc:mysql://" + host + ":" + port + "/" + database + "?user=" + user + "&password=" + password;
        return DriverManager.getConnection(url);
    }

    public static void closeConn(Connection conn) {
        try {
            conn.close();
            System.out.println("conn closed");
        } catch (Exception e) {
            System.out.println("close conn failed");
            e.printStackTrace();
        }
    }

    public static void closeStmt(Statement stmt) {
        try {
            stmt.close();
            System.out.println("stmt closed");
        } catch (Exception e) {
            System.out.println("close stmt failed");
            e.printStackTrace();
        }
    }
}

streamload方式

package com.et.starrocks.streamload;// Copyright (c) 2021 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
 * This class is a java demo for starrocks stream load
 *
 * The pom.xml dependency:
 *
 *         <dependency>
 *             <groupId>org.apache.httpcomponents</groupId>
 *             <artifactId>httpclient</artifactId>
 *             <version>4.5.3</version>
 *         </dependency>
 *
 * How to use:
 *
 * 1 create a table in starrocks with any mysql client
 *
 * CREATE TABLE `stream_test` (
 *   `id` bigint(20) COMMENT "",
 *   `id2` bigint(20) COMMENT "",
 *   `username` varchar(32) COMMENT ""
 * ) ENGINE=OLAP
 * DUPLICATE KEY(`id`)
 * DISTRIBUTED BY HASH(`id`) BUCKETS 20;
 *
 *
 * 2 change the StarRocks cluster, db, user config in this class
 *
 * 3 run this class, you should see the following output:
 *
 * {
 *     "TxnId": 27,
 *     "Label": "39c25a5c-7000-496e-a98e-348a264c81de",
 *     "Status": "Success",
 *     "Message": "OK",
 *     "NumberTotalRows": 10,
 *     "NumberLoadedRows": 10,
 *     "NumberFilteredRows": 0,
 *     "NumberUnselectedRows": 0,
 *     "LoadBytes": 50,
 *     "LoadTimeMs": 151
 * }
 *
 * Attention:
 *
 * 1 wrong dependency version(such as 4.4) of httpclient may cause shaded.org.apache.http.ProtocolException
 *   Caused by: shaded.org.apache.http.ProtocolException: Content-Length header already present
 *     at shaded.org.apache.http.protocol.RequestContent.process(RequestContent.java:96)
 *     at shaded.org.apache.http.protocol.ImmutableHttpProcessor.process(ImmutableHttpProcessor.java:132)
 *     at shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:182)
 *     at shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
 *     at shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
 *     at shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
 *
 *2 run this class more than once, the status code for http response is still ok, and you will see
 *  the following output:
 *
 * {
 *     "TxnId": -1,
 *     "Label": "39c25a5c-7000-496e-a98e-348a264c81de",
 *     "Status": "Label Already Exists",
 *     "ExistingJobStatus": "FINISHED",
 *     "Message": "Label [39c25a5c-7000-496e-a98e-348a264c81de"] has already been used.",
 *     "NumberTotalRows": 0,
 *     "NumberLoadedRows": 0,
 *     "NumberFilteredRows": 0,
 *     "NumberUnselectedRows": 0,
 *     "LoadBytes": 0,
 *     "LoadTimeMs": 0
 * }
 * 3 when the response statusCode is 200, that doesn't mean your stream load is ok, there may be still
 *   some stream problem unless you see the output with 'ok' message
 */
public class StarRocksStreamLoad {
    private final static String STARROCKS_HOST = "127.0.0.1";
    private final static String STARROCKS_DB = "db_test";
    private final static String STARROCKS_TABLE = "stream_test";
    private final static String STARROCKS_USER = "root";
    private final static String STARROCKS_PASSWORD = "";
    private final static int STARROCKS_HTTP_PORT = 8040;

    private void sendData(String content) throws Exception {
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                STARROCKS_HOST,
                STARROCKS_HTTP_PORT,
                STARROCKS_DB,
                STARROCKS_TABLE);

        final HttpClientBuilder httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });

        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(content, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de1");
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                // statusCode 200 just indicates that starrocks be service is ok, not stream load
                // you should see the output content to find whether stream load is success
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
                }

                System.out.println(loadResult);
            }
        }
    }

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public static void main(String[] args) throws Exception {
        int id1 = 1;
        int id2 = 10;
        String id3 = "Simon";
        int rowNumber = 10;
        String oneRow = id1 + "\t" + id2 + "\t" + id3 + "\n";

        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < rowNumber; i++) {
            stringBuilder.append(oneRow);
        }
        
        stringBuilder.deleteCharAt(stringBuilder.length() - 1);

        String loadData = stringBuilder.toString();
        StarRocksStreamLoad starrocksStreamLoad = new StarRocksStreamLoad();
        starrocksStreamLoad.sendData(loadData);
    }
}

4.测试

MysqlClient

启动main方法,可以看到执行成功
connect to starrocks successfully
create statement successfully
create database successfully
set db context successfully
create table successfully
insert data successfully
data queried is :
 1 2 7
 4 5 6
drop database successfully
stmt closed
conn closed

StarRocksStreamLoad

启动main方法,可以看到插入成功
20:51:47.521 [main] DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection [id: 0][route: {}->http://127.0.0.1:8040] can be kept alive indefinitely
20:51:47.521 [main] DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection released: [id: 0][route: {}->http://127.0.0.1:8040][total kept alive: 1; route allocated: 1 of 2; total allocated: 1 of 20]
{
 "TxnId": 2,
 "Label": "39c25a5c-7000-496e-a98e-348a264c81de1",
 "Status": "Success",
 "Message": "OK",
 "NumberTotalRows": 10,
 "NumberLoadedRows": 10,
 "NumberFilteredRows": 0,
 "NumberUnselectedRows": 0,
 "LoadBytes": 109,
 "LoadTimeMs": 975,
 "BeginTxnTimeMs": 261,
 "StreamLoadPlanTimeMs": 342,
 "ReadDataTimeMs": 0,
 "WriteDataTimeMs": 106,
 "CommitAndPublishTimeMs": 259
}

5.引用

 
正文到此结束
Loading...