Flink MemorySegment

Flink作为一款高性能的流计算处理引擎 通过定制的序列化工具、高效的内存管理机制,缓存友好的数据结构、堆外内存的控制 以及 JIT编译等 使得在JVM上能够获得类似C极致的处理性能 本文将深入分析Flink是如何进行内存管理

Flink并不是将对象直接托管于JVM分配到堆上 而是将对象都序列化到一个块上 MemorySegment 是Flink内存分配的最小单元

数据结构

  • byte[] heapMemory 堆内内存引用
  • ByteBuffer offHeapBuffer 直接内存buffer引用 只要memsegment存活 该byte就不会被释放
  • long address 内存相对地址
  • long addressLimit 地址范围 address+ addressLimit
  • size 内存大小

具体MemorySegment的分配是由MemorySegmentFactory来实现 MemorySegment的整体分配机制类似于Java的ByteBuffer 但是相比于ByteBuffer Flink实现了特有的bytes的 compare, swap, and copy 功能、并提供了absolute positioning 的方法批量进行写操作来保证线程安全

数据拷贝

下面分析下wrapCopy 拷贝指定的内存创建新的segment的分配流程

1
2
3
4
5
6
7
8
9
10
11
// start 起始位
// end 结束位 不包括
public static MemorySegment wrapCopy(byte[] bytes, int start, int end)
throws IllegalArgumentException {
checkArgument(end >= start);
checkArgument(end <= bytes.length);
// 通用分配非池化的构造新的Segment 长度为 end-start
MemorySegment copy = allocateUnpooledSegment(end - start);
copy.put(0, bytes, start, copy.size());
return copy;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void put(int index, byte[] src, int offset, int length) {
// check the byte array offset and length
if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
}

//address 为byte arrary begining地址

//启始指针 address 为byte对象头的长度 16 + index启位置就是pos
final long pos = address + index;

// index > 0 且 没有超出 limit 调用UNSAFE copyMemory0 进行拷贝分配
if (index >= 0 && pos <= addressLimit - length) {
final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);
} else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
} else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

字节序

除此之外 Flink实现了 big-endian 大端序 little-endian小端序 的访问方法 . 字节序是字节在内存中的存储顺序 不同的cpu存储方式会有差异 以x86处理器以little-endian排列
JIT 通过LITTLE_ENDIAN来判断是大端还是小端

1
LITTLE_ENDIAN ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN 
1
2
3
4
5
6
7
public char getCharLittleEndian(int index) {
if (LITTLE_ENDIAN) {
return getChar(index);
} else {
return Character.reverseBytes(getChar(index));
}
}

堆内和堆外

Flink在实现了堆内内存后 为了解决高效的网络IO 以及进程间的内存共享 也实现了堆外内存操作 通过sun.misc.Unsafe来进行堆外分配 在之前的实现中
MemorySegment是一个抽象类 从而衍生出HybridMemorySegment和HeapMemorySegment 堆外和堆内的实现 JIT编译时在没有子类的情况下所有的调用方法都是确定的,所有的function都是可以被de-virtualized和inlined 这可以极大地提高性能 具体可参考 Off-heap Memory in Apache Flink and the curious JIT compiler 所以在当前的Flink版本中通过MemorySegment 来同事实现操作堆内和堆外, 成员变量heapMemory为null时则代表堆外 否则为堆内 内部实现是共用的一套分配函数 Usafe提供了强大的堆外和堆内操作机制
以getLong函数分析

1
sun.misc.Unsafe.getLong(Object reference, long offset)

如果refer为null 则offset为操作的地址 从堆外取, 否则就会取该对象的地址 加上offset 加上8字节得到long

内存池化

在了解MemorySegment池化前 先了解下Flink的整体内存模型结构,Flink任务的执行Work节点是以TaskManager为粒度 一个TaskManager可以代表一个JVM进程 TaskManager的内存模型如下图所示

Flink JVM 进程的 进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 其中,Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)、托管内存(Managed Memory)以及其他直接内存(Direct Memory)或本地内存(Native Memory) 托管内存是由FLink分配的管理的堆外内存主要用作 RocksDB State Backend 以及流式作业中用于排序、哈希表及缓存中间结果

在这么多精细的内存控制下 其中NetWork和ManagedMemory 都是一组MemSegment集合 下面介绍下这两块内存是如何管理的以及其实现的设计模式

Buffer接口是对MemorySegment池化的包装实现 类似于Netty中的ByteBuf提供 readIndex、writerIndex 对应读写操作将Segment划分为三块区域discardable、readable、writable 其具体实现NetworkBuffer继承了AbstractReferenceCountedByteBuf 来实现实例的引用计数和读写指针 非netty场景下提供了手动setBytes的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//memorySegment 实例
private final MemorySegment memorySegment;

//buffer回收器
private final BufferRecycler recycler;

//buffer中data的类型 使用one bytes 只支持128种
private DataType dataType;

//ByteBufAllocator分配器 来源于netty
private ByteBufAllocator allocator;

// buffer size
private int currentSize;
// 是否开启压缩
private boolean isCompressed = false;

Flink 提供了线程安全的BufferBuilder 和BufferConsumer 读写模式 BufferBuilder向MemorySegment写数据 写线程和BufferConsumer读线程可处于不同线程中 BufferBuilder 提供了append和appendAndCommit方式来append ByteBuffer 通过PositionMarker来标记当前写的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
interface PositionMarker {
int FINISHED_EMPTY = Integer.MIN_VALUE;

int get();

static boolean isFinished(int position) {
return position < 0;
}

static int getAbsolute(int position) {
if (position == FINISHED_EMPTY) {
return 0;
}
return Math.abs(position);
}
}

BufferPool 是一个动态的 buffer pool 继承了 BufferProvider BufferRecycler 定义申请buffer和回收buffer的功能LocalBufferPool是其具体实现 BufferPoolFactory用于创建buffer pools,整体流程是BufferPoolFactory创建buffer pool ,buffer pool持有 buffer,NetworkBufferPool是BufferPoolFactory的实现

下面看下NetWorkBufferPool的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public NetworkBufferPool(
int numberOfSegmentsToAllocate, int segmentSize, Duration requestSegmentsTimeout) {
// 要分配的 内存段个数
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
// 单个内存段的大小
this.memorySegmentSize = segmentSize;

Preconditions.checkNotNull(requestSegmentsTimeout);
checkArgument(
requestSegmentsTimeout.toMillis() > 0,
"The timeout for requesting exclusive buffers should be positive.");
this.requestSegmentsTimeout = requestSegmentsTimeout;

final long sizeInLong = (long) segmentSize;

try {
// 初始化 内存段双端队列
this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate);
} catch (OutOfMemoryError err) {
throw new OutOfMemoryError(
"Could not allocate buffer queue of length "
+ numberOfSegmentsToAllocate
+ " - "
+ err.getMessage());
}

try {
//分配 堆外的非池化的segment
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
availableMemorySegments.add(
MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
}
} catch (OutOfMemoryError err) {
int allocated = availableMemorySegments.size();

// free some memory
availableMemorySegments.clear();

long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
long allocatedMb = (sizeInLong * allocated) >> 20;
long missingMb = requiredMb - allocatedMb;

throw new OutOfMemoryError(
"Could not allocate enough memory segments for NetworkBufferPool "
+ "(required (MB): "
+ requiredMb
+ ", allocated (MB): "
+ allocatedMb
+ ", missing (MB): "
+ missingMb
+ "). Cause: "
+ err.getMessage());
}

// 设置 AvailabilityProvider 状态
availabilityHelper.resetAvailable();

long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

LOG.info(
"Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb,
availableMemorySegments.size(),
segmentSize);
}

下篇文章将会分析其基于MemorySegment的序列化机制