Flink参数调优设置

IllegalConfigurationException

如果遇到从 TaskExecutorProcessUtils 抛出的 IllegalConfigurationException 异常,这通常说明您的配置参数中存在无效值(例如内存大小为负数、占比大于 1 等)或者配置冲突。 请根据异常信息,确认内存模型详解中与出错的内存部分对应章节的内容。

OutOfMemoryError: Java heap space

该异常说明 JVM 的堆空间过小。 可以通过增大总内存任务堆内存的方法来增大 JVM 堆空间。

提示 也可以增大框架堆内存。这是一个进阶配置,只有在确认是 Flink 框架自身需要更多内存时才应该去调整。

OutOfMemoryError: Direct buffer memory

该异常通常说明 JVM 的直接内存限制过小,或者存在直接内存泄漏(Direct Memory Leak)。 请确认用户代码及外部依赖中是否使用了 JVM 直接内存,以及如果使用了直接内存,是否配置了足够的内存空间。 可以通过调整堆外内存来增大直接内存限制。 请同时参考如何配置堆外内存)以及 Flink 设置的 JVM 参数

OutOfMemoryError: Metaspace

该异常说明 JVM Metaspace 限制过小。 可以尝试调整 JVM Metaspace 参数

KeyDefaultTypeDescription

taskmanager.memory.flink.size

(none)MemorySizeTotal Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Network Memory. See also 'taskmanager.memory.process.size' for total process memory size configuration.

taskmanager.memory.framework.heap.size

128 mbMemorySizeFramework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.

taskmanager.memory.framework.off-heap.size

128 mbMemorySizeFramework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

taskmanager.memory.jvm-metaspace.size

256 mbMemorySizeJVM Metaspace Size for the TaskExecutors.

taskmanager.memory.jvm-overhead.fraction

0.1FloatFraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.jvm-overhead.max

1 gbMemorySizeMax JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.jvm-overhead.min

192 mbMemorySizeMin JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.managed.fraction

0.4FloatFraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.

taskmanager.memory.managed.size

(none)MemorySizeManaged Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.

taskmanager.memory.network.fraction

0.1FloatFraction of Total Flink Memory to be used as Network Memory. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.network.max

1 gbMemorySizeMax Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.

taskmanager.memory.network.min

64 mbMemorySizeMin Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.

taskmanager.memory.process.size

(none)MemorySizeTotal Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory. See also 'taskmanager.memory.flink.size' for total Flink memory size configuration.

taskmanager.memory.task.heap.size

(none)MemorySizeTask Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.

taskmanager.memory.task.off-heap.size

0 bytesMemorySizeTask Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

IOException: Insufficient number of network buffers

该异常通常说明网络内存过小。 可以通过调整以下配置参数增大网络内存

容器(Container)内存超用

如果 TaskExecutor 容器尝试分配超过其申请大小的内存(Yarn、Mesos 或 Kubernetes),这通常说明 Flink 没有预留出足够的本地内存。 可以通过外部监控系统或者容器被部署环境杀掉时的错误信息判断是否存在容器内存超用。

如果使用了 RocksDBStateBackend 且没有开启内存控制,也可以尝试增大托管内存

此外,还可以尝试增大 JVM 开销

可以通过以下两种范式指定托管内存的大小:

当同时指定二者时,会优先采用指定的大小(Size)。 若二者均未指定,会根据默认占比进行计算。

如何配置容器内存

独立部署模式(Standalone Deployment)下的内存配置
独立部署模式,我们通常更关注 Flink 应用本身使用的内存大小。 建议配置 Flink 总内存(taskmanager.memory.flink.size)或者它的组成部分。 此外,如果出现 Metaspace 不足的问题,可以调整 JVM Metaspace 的大小。

这种情况下通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销 进行限制,它只与机器的物理资源相关。

容器(Container)的内存配置
在容器化部署模式(Containerized Deployment)下(Kubernetes、Yarn 或 Mesos),建议配置进程总内存(taskmanager.memory.process.size)。 该配置参数用于指定分配给 Flink JVM 进程的总内存,也就是需要申请的容器大小。

提示 如果配置了 Flink 总内存,Flink 会自动加上 JVM 相关的内存部分,根据推算出的进程总内存大小申请容器。 请参考内存模型详解。

注意: 如果 Flink 或者用户代码分配超过容器大小的非托管的堆外(本地)内存,部署环境可能会杀掉超用内存的容器,造成作业执行失败。
请参考容器内存超用中的相关描述。

State Backend 的内存配置
在部署 Flink 流处理应用时,可以根据 State Backend 的类型对集群的配置进行优化。

Heap State Backend
执行无状态作业或者使用 Heap State Backend(MemoryStateBackend 或 FsStateBackend)时,建议将托管内存设置为 0。 这样能够最大化分配给 JVM 上用户代码的内存。

RocksDB State Backend
RocksDBStateBackend 使用本地内存。 默认情况下,RocksDB 会限制其内存用量不超过用户配置的托管内存。 因此,使用这种方式存储状态时,配置足够多的托管内存是十分重要的。 如果你关闭了 RocksDB 的内存控制,那么在容器化部署模式下如果 RocksDB 分配的内存超出了申请容器的大小(进程总内存),可能会造成 TaskExecutor 被部署环境杀掉。 请同时参考如何调整 RocksDB 内存以及 state.backend.rocksdb.memory.managed。

批处理作业的内存配置
Flink 批处理算子使用托管内存来提高处理效率。 算子运行时,部分操作可以直接在原始数据上进行,而无需将数据反序列化成 Java 对象。 这意味着托管内存对应用的性能具有实质上的影响。 因此 Flink 会在不超过其配置限额的前提下,尽可能分配更多的托管内存。 Flink 明确知道可以使用的内存大小,因此可以有效避免 OutOfMemoryError 的发生。 当托管内存不足时,Flink 会优雅的将数据落盘。

如果可以申请的资源不够的换,调整Hadoop 里面的yarn-site.xml配置文件,增加可用内存,根据当前的服务器情况调整。留有一部分内存给其他资源使用,在flink job启动的时候,申请的taskmemory的大小决定了每个节点上可以运行多少个taskmanager ,然后在每个节点上配置的或者指定的slot数量也要合理配置,根据数据量,一般给2G左右每个slot,50个节点的话, 每个节点配置8个slot就是最大的并行度160个。一般判断可以根据当前最大可用内存然后计算每个节点可以内存,然后根据每个tm可申请资源计算taskmanager,然后taskmanager * slot就是可用最大slot,根据job个数调整每个job的并行度。

<property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>24576</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>15</value>
    </property>
<property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>24576</value>
    </property>

    <property>
        <name>yarn.scheduler.maximum-allocation-vcores</name>
        <value>15</value>
    </property>

 


版权声明:本文为Baron_ND原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。