wgpu/util/
belt.rs

1use crate::{
2    util::align_to, Buffer, BufferAddress, BufferDescriptor, BufferSize, BufferUsages,
3    BufferViewMut, CommandEncoder, Device, MapMode,
4};
5use std::fmt;
6use std::sync::{mpsc, Arc};
7
8struct Chunk {
9    buffer: Arc<Buffer>,
10    size: BufferAddress,
11    offset: BufferAddress,
12}
13
14/// Efficiently performs many buffer writes by sharing and reusing temporary buffers.
15///
16/// Internally it uses a ring-buffer of staging buffers that are sub-allocated.
17/// It has an advantage over [`Queue::write_buffer()`] in a way that it returns a mutable slice,
18/// which you can fill to avoid an extra data copy.
19///
20/// Using a staging belt is slightly complicated, and generally goes as follows:
21/// 1. Write to buffers that need writing to using [`StagingBelt::write_buffer()`].
22/// 2. Call [`StagingBelt::finish()`].
23/// 3. Submit all command encoders that were used in step 1.
24/// 4. Call [`StagingBelt::recall()`].
25///
26/// [`Queue::write_buffer()`]: crate::Queue::write_buffer
27pub struct StagingBelt {
28    chunk_size: BufferAddress,
29    /// Chunks into which we are accumulating data to be transferred.
30    active_chunks: Vec<Chunk>,
31    /// Chunks that have scheduled transfers already; they are unmapped and some
32    /// command encoder has one or more `copy_buffer_to_buffer` commands with them
33    /// as source.
34    closed_chunks: Vec<Chunk>,
35    /// Chunks that are back from the GPU and ready to be mapped for write and put
36    /// into `active_chunks`.
37    free_chunks: Vec<Chunk>,
38    /// When closed chunks are mapped again, the map callback sends them here.
39    sender: mpsc::Sender<Chunk>,
40    /// Free chunks are received here to be put on `self.free_chunks`.
41    receiver: mpsc::Receiver<Chunk>,
42}
43
44impl StagingBelt {
45    /// Create a new staging belt.
46    ///
47    /// The `chunk_size` is the unit of internal buffer allocation; writes will be
48    /// sub-allocated within each chunk. Therefore, for optimal use of memory, the
49    /// chunk size should be:
50    ///
51    /// * larger than the largest single [`StagingBelt::write_buffer()`] operation;
52    /// * 1-4 times less than the total amount of data uploaded per submission
53    ///   (per [`StagingBelt::finish()`]); and
54    /// * bigger is better, within these bounds.
55    pub fn new(chunk_size: BufferAddress) -> Self {
56        let (sender, receiver) = mpsc::channel();
57        StagingBelt {
58            chunk_size,
59            active_chunks: Vec::new(),
60            closed_chunks: Vec::new(),
61            free_chunks: Vec::new(),
62            sender,
63            receiver,
64        }
65    }
66
67    /// Allocate the staging belt slice of `size` to be uploaded into the `target` buffer
68    /// at the specified offset.
69    ///
70    /// The upload will be placed into the provided command encoder. This encoder
71    /// must be submitted after [`StagingBelt::finish()`] is called and before
72    /// [`StagingBelt::recall()`] is called.
73    ///
74    /// If the `size` is greater than the size of any free internal buffer, a new buffer
75    /// will be allocated for it. Therefore, the `chunk_size` passed to [`StagingBelt::new()`]
76    /// should ideally be larger than every such size.
77    pub fn write_buffer(
78        &mut self,
79        encoder: &mut CommandEncoder,
80        target: &Buffer,
81        offset: BufferAddress,
82        size: BufferSize,
83        device: &Device,
84    ) -> BufferViewMut {
85        let mut chunk = if let Some(index) = self
86            .active_chunks
87            .iter()
88            .position(|chunk| chunk.offset + size.get() <= chunk.size)
89        {
90            self.active_chunks.swap_remove(index)
91        } else {
92            self.receive_chunks(); // ensure self.free_chunks is up to date
93
94            if let Some(index) = self
95                .free_chunks
96                .iter()
97                .position(|chunk| size.get() <= chunk.size)
98            {
99                self.free_chunks.swap_remove(index)
100            } else {
101                let size = self.chunk_size.max(size.get());
102                Chunk {
103                    buffer: Arc::new(device.create_buffer(&BufferDescriptor {
104                        label: Some("(wgpu internal) StagingBelt staging buffer"),
105                        size,
106                        usage: BufferUsages::MAP_WRITE | BufferUsages::COPY_SRC,
107                        mapped_at_creation: true,
108                    })),
109                    size,
110                    offset: 0,
111                }
112            }
113        };
114
115        encoder.copy_buffer_to_buffer(&chunk.buffer, chunk.offset, target, offset, size.get());
116        let old_offset = chunk.offset;
117        chunk.offset = align_to(chunk.offset + size.get(), crate::MAP_ALIGNMENT);
118
119        self.active_chunks.push(chunk);
120        self.active_chunks
121            .last()
122            .unwrap()
123            .buffer
124            .slice(old_offset..old_offset + size.get())
125            .get_mapped_range_mut()
126    }
127
128    /// Prepare currently mapped buffers for use in a submission.
129    ///
130    /// This must be called before the command encoder(s) provided to
131    /// [`StagingBelt::write_buffer()`] are submitted.
132    ///
133    /// At this point, all the partially used staging buffers are closed (cannot be used for
134    /// further writes) until after [`StagingBelt::recall()`] is called *and* the GPU is done
135    /// copying the data from them.
136    pub fn finish(&mut self) {
137        for chunk in self.active_chunks.drain(..) {
138            chunk.buffer.unmap();
139            self.closed_chunks.push(chunk);
140        }
141    }
142
143    /// Recall all of the closed buffers back to be reused.
144    ///
145    /// This must only be called after the command encoder(s) provided to
146    /// [`StagingBelt::write_buffer()`] are submitted. Additional calls are harmless.
147    /// Not calling this as soon as possible may result in increased buffer memory usage.
148    pub fn recall(&mut self) {
149        self.receive_chunks();
150
151        let sender = &self.sender;
152        for chunk in self.closed_chunks.drain(..) {
153            let sender = sender.clone();
154            chunk
155                .buffer
156                .clone()
157                .slice(..)
158                .map_async(MapMode::Write, move |_| {
159                    let _ = sender.send(chunk);
160                });
161        }
162    }
163
164    /// Move all chunks that the GPU is done with (and are now mapped again)
165    /// from `self.receiver` to `self.free_chunks`.
166    fn receive_chunks(&mut self) {
167        while let Ok(mut chunk) = self.receiver.try_recv() {
168            chunk.offset = 0;
169            self.free_chunks.push(chunk);
170        }
171    }
172}
173
174impl fmt::Debug for StagingBelt {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        f.debug_struct("StagingBelt")
177            .field("chunk_size", &self.chunk_size)
178            .field("active_chunks", &self.active_chunks.len())
179            .field("closed_chunks", &self.closed_chunks.len())
180            .field("free_chunks", &self.free_chunks.len())
181            .finish_non_exhaustive()
182    }
183}