本文主要研究一下flink JobManager的heap大小设置
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving public class JobManagerOptions { //...... /** * JVM heap size for the JobManager with memory size. */ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY) public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY = key("jobmanager.heap.size") .defaultValue("1024m") .withDescription("JVM heap size for the JobManager."); /** * JVM heap size (in megabytes) for the JobManager. * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY} */ @Deprecated public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB = key("jobmanager.heap.mb") .defaultValue(1024) .withDescription("JVM heap size (in megabytes) for the JobManager."); //...... }
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java
public class ConfigurationUtils { private static final String[] EMPTY = new String[0]; /** * Get job manager's heap memory. This method will check the new key * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility. * * @param configuration the configuration object * @return the memory size of job manager's heap memory. */ public static MemorySize getJobManagerHeapMemory(Configuration configuration) { if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)); } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) { return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m"); } else { //use default value return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)); } } //...... }
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
@PublicEvolving public class MemorySize implements java.io.Serializable { private static final long serialVersionUID = 1L; // ------------------------------------------------------------------------ /** The memory size, in bytes. */ private final long bytes; /** * Constructs a new MemorySize. * * @param bytes The size, in bytes. Must be zero or larger. */ public MemorySize(long bytes) { checkArgument(bytes >= 0, "bytes must be >= 0"); this.bytes = bytes; } // ------------------------------------------------------------------------ /** * Gets the memory size in bytes. */ public long getBytes() { return bytes; } /** * Gets the memory size in Kibibytes (= 1024 bytes). */ public long getKibiBytes() { return bytes >> 10; } /** * Gets the memory size in Mebibytes (= 1024 Kibibytes). */ public int getMebiBytes() { return (int) (bytes >> 20); } /** * Gets the memory size in Gibibytes (= 1024 Mebibytes). */ public long getGibiBytes() { return bytes >> 30; } /** * Gets the memory size in Tebibytes (= 1024 Gibibytes). */ public long getTebiBytes() { return bytes >> 40; } // ------------------------------------------------------------------------ @Override public int hashCode() { return (int) (bytes ^ (bytes >>> 32)); } @Override public boolean equals(Object obj) { return obj == this || (obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes); } @Override public String toString() { return bytes + " bytes"; } // ------------------------------------------------------------------------ // Parsing // ------------------------------------------------------------------------ /** * Parses the given string as as MemorySize. * * @param text The string to parse * @return The parsed MemorySize * * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. */ public static MemorySize parse(String text) throws IllegalArgumentException { return new MemorySize(parseBytes(text)); } /** * Parses the given string with a default unit. * * @param text The string to parse. * @param defaultUnit specify the default unit. * @return The parsed MemorySize. * * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. */ public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException { if (!hasUnit(text)) { return parse(text + defaultUnit.getUnits()[0]); } return parse(text); } /** * Parses the given string as bytes. * The supported expressions are listed under {@link MemorySize}. * * @param text The string to parse * @return The parsed size, in bytes. * * @throws IllegalArgumentException Thrown, if the expression cannot be parsed. */ public static long parseBytes(String text) throws IllegalArgumentException { checkNotNull(text, "text"); final String trimmed = text.trim(); checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string"); final int len = trimmed.length(); int pos = 0; char current; while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { pos++; } final String number = trimmed.substring(0, pos); final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); if (number.isEmpty()) { throw new NumberFormatException("text does not start with a number"); } final long value; try { value = Long.parseLong(number); // this throws a NumberFormatException on overflow } catch (NumberFormatException e) { throw new IllegalArgumentException("The value '" + number + "' cannot be re represented as 64bit number (numeric overflow)."); } final long multiplier; if (unit.isEmpty()) { multiplier = 1L; } else { if (matchesAny(unit, BYTES)) { multiplier = 1L; } else if (matchesAny(unit, KILO_BYTES)) { multiplier = 1024L; } else if (matchesAny(unit, MEGA_BYTES)) { multiplier = 1024L * 1024L; } else if (matchesAny(unit, GIGA_BYTES)) { multiplier = 1024L * 1024L * 1024L; } else if (matchesAny(unit, TERA_BYTES)) { multiplier = 1024L * 1024L * 1024L * 1024L; } else { throw new IllegalArgumentException("Memory size unit '" + unit + "' does not match any of the recognized units: " + MemoryUnit.getAllUnits()); } } final long result = value * multiplier; // check for overflow if (result / multiplier != value) { throw new IllegalArgumentException("The value '" + text + "' cannot be re represented as 64bit number of bytes (numeric overflow)."); } return result; } private static boolean matchesAny(String str, MemoryUnit unit) { for (String s : unit.getUnits()) { if (s.equals(str)) { return true; } } return false; } //...... }
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
/** * Enum which defines memory unit, mostly used to parse value from configuration file. * * <p>To make larger values more compact, the common size suffixes are supported: * * <ul> * <li>q or 1b or 1bytes (bytes) * <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes) * <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes) * <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes) * <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes) * </ul> * */ public enum MemoryUnit { BYTES(new String[] { "b", "bytes" }), KILO_BYTES(new String[] { "k", "kb", "kibibytes" }), MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }), GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }), TERA_BYTES(new String[] { "t", "tb", "tebibytes" }); private String[] units; MemoryUnit(String[] units) { this.units = units; } public String[] getUnits() { return units; } public static String getAllUnits() { return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits()); } public static boolean hasUnit(String text) { checkNotNull(text, "text"); final String trimmed = text.trim(); checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string"); final int len = trimmed.length(); int pos = 0; char current; while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { pos++; } final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); return unit.length() > 0; } private static String concatenateUnits(final String[]... allUnits) { final StringBuilder builder = new StringBuilder(128); for (String[] units : allUnits) { builder.append('('); for (String unit : units) { builder.append(unit); builder.append(" | "); } builder.setLength(builder.length() - 3); builder.append(") / "); } builder.setLength(builder.length() - 3); return builder.toString(); } }
flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> { //...... private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { if (cmd.hasOption(container.getOpt())) { // number of containers is required option! LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt()); } // TODO: The number of task manager should be deprecated soon final int numberTaskManagers; if (cmd.hasOption(container.getOpt())) { numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt())); } else { numberTaskManagers = 1; } // JobManager Memory final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes(); // Task Managers memory final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes(); int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); return new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMB) .setTaskManagerMemoryMB(taskManagerMemoryMB) .setNumberTaskManagers(numberTaskManagers) .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification(); } //...... }
flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh
//...... DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer) DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary //...... # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Try read old config key, if new key not exists if [ "${FLINK_JM_HEAP}" == 0 ]; then FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}") fi //...... if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )" fi if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}") # Remove leading and ending double quotes (if present) of value FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )" fi //...... # Arguments for the JVM. Used for job and task manager JVMs. # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that! if [ -z "${JVM_ARGS}" ]; then JVM_ARGS="" fi //......
flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh
#!/usr/bin/env bash ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # Start/stop a Flink JobManager. USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all" STARTSTOP=$1 HOST=$2 # optional when starting multiple instances WEBUIPORT=$3 # optional when starting multiple instances if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then echo $USAGE exit 1 fi bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh ENTRYPOINT=standalonesession if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then echo "used deprecated key /`${KEY_JOBM_MEM_MB}/`, please replace with key /`${KEY_JOBM_MEM_SIZE}/`" else flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP}) FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes}) fi if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m" fi # Add JobManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" # Startup parameters args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster") if [ ! -z $HOST ]; then args+=("--host") args+=("${HOST}") fi if [ ! -z $WEBUIPORT ]; then args+=("--webui-port") args+=("${WEBUIPORT}") fi fi if [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}" else "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}" fi
FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
) 依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS( 依据env.java.opts
)中 flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh
#!/usr/bin/env bash ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # Start a Flink service as a console application. Must be stopped with Ctrl-C # or with SIGTERM by kill or the controlling process. USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]" SERVICE=$1 ARGS=("${@:2}") # get remaining arguments as array bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh case $SERVICE in (taskexecutor) CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner ;; (historyserver) CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer ;; (zookeeper) CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer ;; (standalonesession) CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint ;; (standalonejob) CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint ;; (*) echo "Unknown service '${SERVICE}'. $USAGE." exit 1 ;; esac FLINK_TM_CLASSPATH=`constructFlinkClassPath` log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "/(.*/)/./(.*/)/..*"//1/2/; 1q') # Only set JVM 8 arguments if we have correctly extracted the version if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then if [ "$JAVA_VERSION" -lt 18 ]; then JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m" fi fi echo "Starting $SERVICE as a console application on host $HOSTNAME." exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM( 依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS( 依据env.java.opts
)中;jobmanager.sh最后调用flink-console.sh来启动相关类
由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,要么指定FLINK_JM_HEAP环境变量( 比如FLINK_JM_HEAP=512m
),要么在flink-conf.yaml中指定jobmanager.heap.size或者env.java.opts以及env.java.opts.jobmanager