GlueGen v2.6.0-rc-20250712
GlueGen, Native Binding Generator for Java™ (public API).
MappedByteBufferInputStream.java
Go to the documentation of this file.
1/**
2 * Copyright 2014 JogAmp Community. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without modification, are
5 * permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice, this list of
8 * conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
11 * of conditions and the following disclaimer in the documentation and/or other materials
12 * provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY JogAmp Community ``AS IS'' AND ANY EXPRESS OR IMPLIED
15 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
16 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JogAmp Community OR
17 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
20 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
21 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
22 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 *
24 * The views and conclusions contained in the software and documentation are those of the
25 * authors and should not be interpreted as representing official policies, either expressed
26 * or implied, of JogAmp Community.
27 */
28package com.jogamp.common.nio;
29
30import java.io.IOException;
31import java.io.InputStream;
32import java.io.OutputStream;
33import java.io.PrintStream;
34import java.io.RandomAccessFile;
35import java.lang.ref.WeakReference;
36import java.nio.ByteBuffer;
37import java.nio.MappedByteBuffer;
38import java.nio.channels.FileChannel;
39import java.nio.channels.FileChannel.MapMode;
40
41import com.jogamp.common.os.Platform.OSType;
42
43import jogamp.common.Debug;
44import jogamp.common.os.PlatformPropsImpl;
45
46/**
47 * An {@link InputStream} implementation based on an underlying {@link FileChannel}'s memory mapped {@link ByteBuffer},
48 * {@link #markSupported() supporting} {@link #mark(int) mark} and {@link #reset()}.
49 * <p>
50 * Implementation allows full memory mapped {@link ByteBuffer} coverage via {@link FileChannel#map(MapMode, long, long) FileChannel}
51 * beyond its size limitation of {@link Integer#MAX_VALUE} utilizing an array of {@link ByteBuffer} slices.<br>
52 * </p>
53 * <p>
54 * Implementation further allows full random access via {@link #position()} and {@link #position(long)}
55 * and accessing the memory mapped {@link ByteBuffer} slices directly via {@link #currentSlice()} and {@link #nextSlice()}.
56 * </p>
57 * @since 2.3.0
58 */
59public class MappedByteBufferInputStream extends InputStream {
60 public static enum CacheMode {
61 /**
62 * Keep all previous lazily cached buffer slices alive, useful for hopping readers,
63 * i.e. random access via {@link MappedByteBufferInputStream#position(long) position(p)}
64 * or {@link MappedByteBufferInputStream#reset() reset()}.
65 * <p>
66 * Note that without flushing, the platform may fail memory mapping
67 * due to virtual address space exhaustion.<br>
68 * In such case an {@link OutOfMemoryError} may be thrown directly,
69 * or encapsulated as the {@link IOException#getCause() the cause}
70 * of a thrown {@link IOException}.
71 * </p>
72 */
74 /**
75 * Soft flush the previous lazily cached buffer slice when caching the next buffer slice,
76 * useful for sequential forward readers, as well as for hopping readers like {@link #FLUSH_NONE}
77 * in case of relatively short periods between hopping across slices.
78 * <p>
79 * Implementation clears the buffer slice reference
80 * while preserving a {@link WeakReference} to allow its resurrection if not yet
81 * {@link System#gc() garbage collected}.
82 * </p>
83 */
85 /**
86 * Hard flush the previous lazily cached buffer slice when caching the next buffer slice,
87 * useful for sequential forward readers.
88 * <p>
89 * Besides clearing the buffer slice reference,
90 * implementation attempts to hard flush the mapped buffer
91 * using a {@code sun.misc.Cleaner} by reflection.
92 * In case such method does not exist nor works, implementation falls back to {@link #FLUSH_PRE_SOFT}.
93 * </p>
94 * <p>
95 * This is the default.
96 * </p>
97 */
98 FLUSH_PRE_HARD
99 };
100
101 /**
102 * File resize interface allowing a file to change its size,
103 * e.g. via {@link RandomAccessFile#setLength(long)}.
104 */
105 public static interface FileResizeOp {
106 /**
107 * @param newSize the new file size
108 * @throws IOException if file size change is not supported or any other I/O error occurs
109 */
110 void setLength(final long newSize) throws IOException;
111 }
112 private static final FileResizeOp NoFileResize = new FileResizeOp() {
113 @Override
114 public void setLength(final long newSize) throws IOException {
115 throw new IOException("file size change not supported");
116 }
117 };
118
119 /**
120 * Default slice shift, i.e. 1L << shift, denoting slice size in MiB:
121 * <ul>
122 * <li>{@link Platform#is64Bit() 64bit machines} -> 30 = 1024 MiB</li>
123 * <li>{@link Platform#is32Bit() 32bit machines} -> 29 = 512 MiB</li>
124 * </ul>
125 * <p>
126 * In case the default is too much of-used up address-space, one may choose other values:
127 * <ul>
128 * <li>29 -> 512 MiB</li>
129 * <li>28 -> 256 MiB</li>
130 * <li>27 -> 128 MiB</li>
131 * <li>26 -> 64 MiB</li>
132 * </ul>
133 * </p>
134 */
135 public static final int DEFAULT_SLICE_SHIFT;
136
137 static final boolean DEBUG;
138
139 static {
140 if( PlatformPropsImpl.CPU_ARCH.is32Bit ) {
142 } else {
144 }
145
146 DEBUG = Debug.debug("ByteBufferInputStream");
147 }
148
149 private final int sliceShift;
150 private final FileChannel fc;
151 private final FileChannel.MapMode mmode;
152 private FileResizeOp fileResizeOp = NoFileResize;
153
154 private int sliceCount;
155 private ByteBuffer[] slices;
156 private WeakReference<ByteBuffer>[] slices2GC;
157 private long totalSize;
158 private int slicesEntries, slices2GCEntries;
159 private boolean synchronous;
160
161 private int refCount;
162
163 private CacheMode cmode;
164
165 private int sliceIdx;
166 private long mark;
167
168 final void dbgDump(final String prefix, final PrintStream out) {
169 int _slicesEntries = 0;
170 for(int i=0; i<sliceCount; i++) {
171 if( null != slices[i] ) {
172 _slicesEntries++;
173 }
174 }
175 int _slices2GCEntries = 0;
176 int _slices2GCAliveEntries = 0;
177 for(int i=0; i<sliceCount; i++) {
178 final WeakReference<ByteBuffer> ref = slices2GC[i];
179 if( null != ref ) {
180 _slices2GCEntries++;
181 if( null != ref.get() ) {
182 _slices2GCAliveEntries++;
183 }
184 }
185 }
186 long fcSz = 0, pos = 0, rem = 0;
187 if( fc.isOpen() ) {
188 try {
189 fcSz = fc.size();
190 } catch (final IOException e) {
191 e.printStackTrace();
192 }
193 }
194 if( 0 < refCount ) {
195 try {
196 pos = position();
197 rem = totalSize - pos;
198 } catch (final IOException e) {
199 e.printStackTrace();
200 }
201 }
202 final int sliceCount2 = null != slices ? slices.length : 0;
203 out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize);
204 out.println(prefix+" position "+pos+", remaining "+rem);
205 out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp);
206 out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+"), synchronous "+synchronous);
207 out.println(prefix+" mapped "+slicesEntries+" / "+_slicesEntries);
208 out.println(prefix+" GC-queue "+slices2GCEntries+" / "+_slices2GCEntries+" (alive "+_slices2GCAliveEntries+")");
209 out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift));
210 }
211
212 MappedByteBufferInputStream(final FileChannel fc, final FileChannel.MapMode mmode, final CacheMode cmode,
213 final int sliceShift, final long totalSize, final int currSliceIdx) throws IOException {
214 this.sliceShift = sliceShift;
215 this.fc = fc;
216 this.mmode = mmode;
217
218 if( 0 > totalSize ) {
219 throw new IllegalArgumentException("Negative size "+totalSize);
220 }
221 // trigger notifyLengthChange
222 this.totalSize = -1;
223 this.sliceCount = 0;
224 notifyLengthChange( totalSize );
225
226 this.refCount = 1;
227 this.cmode = cmode;
228
229 this.sliceIdx = currSliceIdx;
230 this.mark = -1;
231
232 currentSlice().position(0);
233
234 if( MappedByteBufferInputStream.DEBUG ) {
235 this.dbgDump("CTOR", System.err);
236 }
237 }
238
239 /**
240 * Creates a new instance using the given {@link FileChannel}.
241 * <p>
242 * The {@link ByteBuffer} slices will be mapped lazily at first usage.
243 * </p>
244 * @param fileChannel the file channel to be mapped lazily.
245 * @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
246 * @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_HARD}.
247 * @param sliceShift the pow2 slice size, default is {@link #DEFAULT_SLICE_SHIFT}.
248 * @throws IOException
249 */
250 public MappedByteBufferInputStream(final FileChannel fileChannel,
251 final FileChannel.MapMode mmode,
252 final CacheMode cmode,
253 final int sliceShift) throws IOException {
254 this(fileChannel, mmode, cmode, sliceShift, fileChannel.size(), 0);
255 }
256
257 /**
258 * Creates a new instance using the given {@link FileChannel},
259 * given mapping-mode, given cache-mode and the {@link #DEFAULT_SLICE_SHIFT}.
260 * <p>
261 * The {@link ByteBuffer} slices will be mapped lazily at first usage.
262 * </p>
263 * @param fileChannel the file channel to be used.
264 * @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
265 * @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_HARD}.
266 * @throws IOException
267 */
268 public MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode) throws IOException {
269 this(fileChannel, mmode, cmode, DEFAULT_SLICE_SHIFT);
270 }
271
272 /**
273 * Creates a new instance using the given {@link FileChannel},
274 * {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode, {@link CacheMode#FLUSH_PRE_HARD}
275 * and the {@link #DEFAULT_SLICE_SHIFT}.
276 * <p>
277 * The {@link ByteBuffer} slices will be mapped {@link FileChannel.MapMode#READ_ONLY} lazily at first usage.
278 * </p>
279 * @param fileChannel the file channel to be used.
280 * @throws IOException
281 */
282 public MappedByteBufferInputStream(final FileChannel fileChannel) throws IOException {
283 this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_HARD, DEFAULT_SLICE_SHIFT);
284 }
285
286 /**
287 * Enable or disable synchronous mode.
288 * <p>
289 * If synchronous mode is enabled, mapped buffers will be {@link #flush(boolean) flushed}
290 * if {@link #notifyLengthChange(long) resized}, <i>written to</i> or {@link #close() closing} in {@link FileChannel.MapMode#READ_WRITE read-write} mapping mode.
291 * </p>
292 * <p>
293 * If synchronous mode is enabled, {@link FileChannel#force(boolean)} is issued
294 * if {@link #setLength(long) resizing} or {@link #close() closing} and not in {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode.
295 * </p>
296 * @param s {@code true} to enable synchronous mode
297 */
298 public final synchronized void setSynchronous(final boolean s) {
299 synchronous = s;
300 }
301 /**
302 * Return {@link #setSynchronous(boolean) synchronous mode}.
303 */
304 public final synchronized boolean getSynchronous() {
305 return synchronous ;
306 }
307
308 final synchronized void checkOpen() throws IOException {
309 if( 0 == refCount ) {
310 throw new IOException("stream closed");
311 }
312 }
313
314 @Override
315 public final synchronized void close() throws IOException {
316 if( 0 < refCount ) {
317 refCount--;
318 if( 0 == refCount ) {
319 try {
320 cleanAllSlices( true /* syncBuffer */ );
321 } finally {
322 flushImpl(true /* metaData */, false /* syncBuffer */);
323 fc.close();
324 mark = -1;
325 sliceIdx = -1;
326 super.close();
327 }
328 }
329 }
330 if( MappedByteBufferInputStream.DEBUG ) {
331 this.dbgDump("Close", System.err);
332 }
333 }
334
335 final FileChannel.MapMode getMapMode() { return mmode; }
336
337 /**
338 * @param fileResizeOp the new {@link FileResizeOp}.
339 * @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
340 */
341 public final synchronized void setFileResizeOp(final FileResizeOp fileResizeOp) throws IllegalStateException {
342 if( NoFileResize != this.fileResizeOp && this.fileResizeOp != fileResizeOp ) {
343 throw new IllegalStateException("FileResizeOp already set, this value differs");
344 }
345 this.fileResizeOp = null != fileResizeOp ? fileResizeOp : NoFileResize;
346 }
347
348 /**
349 * Resize the underlying {@link FileChannel}'s size and adjusting this instance
350 * via {@link #notifyLengthChange(long) accordingly}.
351 * <p>
352 * User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before.
353 * </p>
354 * <p>
355 * Implementation calls {@link #notifyLengthChange(long)} after {@link FileResizeOp#setLength(long)}.
356 * </p>
357 * @param newTotalSize the new total size
358 * @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered}
359 * or if a buffer slice operation failed
360 */
361 public final synchronized void setLength(final long newTotalSize) throws IOException {
362 final long currentPosition;
363 if( 0 != newTotalSize && totalSize != newTotalSize ) {
364 currentPosition = position();
365 } else {
366 currentPosition = -1L;
367 }
368 if( fc.size() != newTotalSize ) {
369 if( OSType.WINDOWS == PlatformPropsImpl.OS_TYPE ) {
370 // On Windows, we have to close all mapped slices.
371 // Otherwise we will receive:
372 // java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open
373 // at java.io.RandomAccessFile.setLength(Native Method)
374 cleanAllSlices( synchronous );
375 }
376 fileResizeOp.setLength(newTotalSize);
377 if( synchronous ) {
378 // buffers will be synchronized in notifyLengthChangeImpl(..)
379 flushImpl( true /* metaData */, false /* syncBuffer */);
380 }
381 }
382 notifyLengthChangeImpl(newTotalSize, currentPosition);
383 }
384
385 /**
386 * Notify this instance that the underlying {@link FileChannel}'s size has been changed
387 * and adjusting this instances buffer slices and states accordingly.
388 * <p>
389 * Should be called by user API when aware of such event.
390 * </p>
391 * @param newTotalSize the new total size
392 * @throws IOException if a buffer slice operation failed
393 */
394 public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException {
395 notifyLengthChangeImpl(newTotalSize, -1L);
396 }
397 private final synchronized void notifyLengthChangeImpl(final long newTotalSize, final long currentPosition) throws IOException {
398 /* if( DEBUG ) {
399 System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize);
400 dbgDump("notifyLengthChange.0:", System.err);
401 } */
402 if( totalSize == newTotalSize ) {
403 // NOP
404 return;
405 } else if( 0 == newTotalSize ) {
406 // ZERO - ensure one entry avoiding NULL checks
407 cleanAllSlices( synchronous );
408 @SuppressWarnings("unchecked")
409 final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ 1 ];
410 slices2GC = newSlices2GC;
411 slices = new ByteBuffer[1];
412 slices[0] = ByteBuffer.allocate(0);
413 sliceCount = 0;
414 totalSize = 0;
415 mark = -1;
416 sliceIdx = 0;
417 } else {
418 final long prePosition = 0 <= currentPosition ? currentPosition : position();
419
420 final long sliceSize = 1L << sliceShift;
421 final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
422 @SuppressWarnings("unchecked")
423 final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ newSliceCount ];
424 final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ];
425 final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize)
426 if( 0 <= copySliceCount ) {
427 if( 0 < copySliceCount ) {
428 System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
429 System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
430 }
431 for(int i=copySliceCount; i<sliceCount; i++) { // clip shrunken slices + 1 (last), incl. slices2GC!
432 cleanSlice(i, synchronous);
433 }
434 }
435 slices2GC = newSlices2GC;
436 slices = newSlices;
437 sliceCount = newSliceCount;
438 totalSize = newTotalSize;
439 if( newTotalSize < mark ) {
440 mark = -1;
441 }
442 position2( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer)
443 }
444 if( MappedByteBufferInputStream.DEBUG ) {
445 this.dbgDump("NotifyLengthChange", System.err);
446 }
447 }
448
449 /**
450 * Similar to {@link OutputStream#flush()}, synchronizes all mapped buffers
451 * from local storage via {@link MappedByteBuffer#force()}
452 * as well as the {@link FileChannel#force(boolean)} w/o {@code metaData}.
453 * @param metaData TODO
454 * @throws IOException if this stream has been {@link #close() closed}.
455 */
456 public final synchronized void flush(final boolean metaData) throws IOException {
457 checkOpen();
458 flushImpl(metaData, true);
459 }
460 private final synchronized void flushImpl(final boolean metaData, final boolean syncBuffer) throws IOException {
461 if( FileChannel.MapMode.READ_ONLY != mmode ) {
462 if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) {
463 for(int i=0; i<sliceCount; i++) {
464 syncSlice(slices[i], true);
465 }
466 for(int i=0; i<sliceCount; i++) {
467 final WeakReference<ByteBuffer> ref = slices2GC[i];
468 if( null != ref ) {
469 syncSlice(ref.get(), true);
470 }
471 }
472 }
473 fc.force(metaData);
474 }
475 }
476
477
478 /**
479 * Returns a new MappedByteBufferOutputStream instance sharing
480 * all resources of this input stream, including all buffer slices.
481 *
482 * @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
483 * @throws IOException if this instance was opened w/ {@link FileChannel.MapMode#READ_ONLY}
484 * or if this stream has been {@link #close() closed}.
485 */
486 public final synchronized MappedByteBufferOutputStream getOutputStream(final FileResizeOp fileResizeOp)
487 throws IllegalStateException, IOException
488 {
489 checkOpen();
490 final MappedByteBufferOutputStream res = new MappedByteBufferOutputStream(this, fileResizeOp);
491 refCount++;
492 return res;
493 }
494
495 /**
496 * Return the mapped {@link ByteBuffer} slice at the current {@link #position()}.
497 * <p>
498 * Due to the nature of using sliced buffers mapping the whole region,
499 * user has to determine whether the returned buffer covers the desired region
500 * and may fetch the {@link #nextSlice()} until satisfied.<br>
501 * It is also possible to repeat this operation after reposition the stream via {@link #position(long)}
502 * or {@link #skip(long)} to a position within the next block, similar to {@link #nextSlice()}.
503 * </p>
504 * @throws IOException if a buffer slice operation failed.
505 */
506 public final synchronized ByteBuffer currentSlice() throws IOException {
507 final ByteBuffer s0 = slices[sliceIdx];
508 if ( null != s0 ) {
509 return s0;
510 } else {
511 if( CacheMode.FLUSH_PRE_SOFT == cmode ) {
512 final WeakReference<ByteBuffer> ref = slices2GC[sliceIdx];
513 if( null != ref ) {
514 final ByteBuffer mbb = ref.get();
515 slices2GC[sliceIdx] = null;
516 slices2GCEntries--;
517 if( null != mbb ) {
518 slices[sliceIdx] = mbb;
519 slicesEntries++;
520 return mbb;
521 }
522 }
523 }
524 final long pos = (long)sliceIdx << sliceShift;
525 final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
526 slices[sliceIdx] = s1;
527 slicesEntries++;
528 return s1;
529 }
530 }
531
532 /**
533 * Return the <i>next</i> mapped {@link ByteBuffer} slice from the current {@link #position()},
534 * implicitly setting {@link #position(long)} to the start of the returned <i>next</i> slice,
535 * see {@link #currentSlice()}.
536 * <p>
537 * If no subsequent slice is available, {@code null} is being returned.
538 * </p>
539 * @throws IOException if a buffer slice operation failed.
540 */
541 public final synchronized ByteBuffer nextSlice() throws IOException {
542 if ( sliceIdx < sliceCount - 1 ) {
543 flushSlice(sliceIdx, synchronous);
544 sliceIdx++;
545 final ByteBuffer slice = currentSlice();
546 slice.position( 0 );
547 return slice;
548 } else {
549 return null;
550 }
551 }
552
553 /**
554 * Releases the mapped {@link ByteBuffer} slices.
555 * @throws IOException if a buffer slice operation failed.
556 */
557 public final synchronized void flushSlices() throws IOException {
558 if( null != slices ) {
559 for(int i=0; i<sliceCount; i++) {
560 flushSlice(i, synchronous);
561 }
562 }
563 if( MappedByteBufferInputStream.DEBUG ) {
564 this.dbgDump("FlushSlices", System.err);
565 }
566 }
567
568 synchronized void syncSlice(final ByteBuffer s) throws IOException {
569 syncSlice(s, synchronous);
570 }
571 synchronized void syncSlice(final ByteBuffer s, final boolean syncBuffer) throws IOException {
572 if( syncBuffer && null != s && FileChannel.MapMode.READ_WRITE == mmode ) {
573 try {
574 ((MappedByteBuffer)s).force();
575 } catch( final Throwable t ) {
576 // On Windows .. this may happen, like:
577 // java.io.IOException: The process cannot access the file because another process has locked a portion of the file
578 // at java.nio.MappedByteBuffer.force0(Native Method)
579 // at java.nio.MappedByteBuffer.force(MappedByteBuffer.java:203)
580 if( DEBUG ) {
581 System.err.println("Caught "+t.getMessage());
582 t.printStackTrace();
583 }
584 }
585 }
586 }
587 private synchronized void flushSlice(final int i, final boolean syncBuffer) throws IOException {
588 final ByteBuffer s = slices[i];
589 if ( null != s ) {
590 if( CacheMode.FLUSH_NONE != cmode ) {
591 slices[i] = null; // trigger slice GC
592 slicesEntries--;
593 if( CacheMode.FLUSH_PRE_HARD == cmode ) {
594 if( !cleanBuffer(s, syncBuffer) ) {
595 // buffer already synced in cleanBuffer(..) if requested
596 slices2GC[i] = new WeakReference<ByteBuffer>(s);
597 slices2GCEntries++;
598 }
599 } else {
600 syncSlice(s, syncBuffer);
601 slices2GC[i] = new WeakReference<ByteBuffer>(s);
602 slices2GCEntries++;
603 }
604 } else {
605 syncSlice(s, syncBuffer);
606 }
607 }
608 }
609 private synchronized void cleanAllSlices(final boolean syncBuffers) throws IOException {
610 if( null != slices ) {
611 for(int i=0; i<sliceCount; i++) {
612 cleanSlice(i, syncBuffers);
613 }
614 if( 0 != slicesEntries || 0 != slices2GCEntries ) { // FIXME
615 final String err = "mappedSliceCount "+slicesEntries+", slices2GCEntries "+slices2GCEntries;
616 dbgDump(err+": ", System.err);
617 throw new InternalError(err);
618 }
619 }
620 }
621
622 private synchronized void cleanSlice(final int i, final boolean syncBuffer) throws IOException {
623 final ByteBuffer s1 = slices[i];
624 final ByteBuffer s2;
625 {
626 final WeakReference<ByteBuffer> ref = slices2GC[i];
627 slices2GC[i] = null;
628 if( null != ref ) {
629 slices2GCEntries--;
630 s2 = ref.get();
631 } else {
632 s2 = null;
633 }
634 }
635 if( null != s1 ) {
636 slices[i] = null;
637 slicesEntries--;
638 cleanBuffer(s1, syncBuffer);
639 if( null != s2 ) {
640 throw new InternalError("XXX");
641 }
642 } else if( null != s2 ) {
643 cleanBuffer(s2, syncBuffer);
644 }
645 }
646 private synchronized boolean cleanBuffer(final ByteBuffer mbb, final boolean syncBuffer) throws IOException {
647 syncSlice(mbb, syncBuffer);
648 if( !mbb.isDirect() ) {
649 return false;
650 }
651 if( !Buffers.Cleaner.clean(mbb) && CacheMode.FLUSH_PRE_HARD == cmode ) {
652 cmode = CacheMode.FLUSH_PRE_SOFT;
653 return false;
654 } else {
655 return true;
656 }
657 }
658
659 /**
660 * Return the used {@link CacheMode}.
661 * <p>
662 * If a desired {@link CacheMode} is not available, it may fall back to an available one at runtime,
663 * see {@link CacheMode#FLUSH_PRE_HARD}.<br>
664 * This evaluation only happens if the {@link CacheMode} != {@link CacheMode#FLUSH_NONE}
665 * and while attempting to flush an unused buffer slice.
666 * </p>
667 */
668 public final synchronized CacheMode getCacheMode() { return cmode; }
669
670 /**
671 * Returns the total size in bytes of the {@link InputStream}
672 * <pre>
673 * <code>0 <= {@link #position()} <= {@link #length()}</code>
674 * </pre>
675 */
676 // @Override
677 public final synchronized long length() {
678 return totalSize;
679 }
680
681 /**
682 * Returns the number of remaining available bytes of the {@link InputStream},
683 * i.e. <code>{@link #length()} - {@link #position()}</code>.
684 * <pre>
685 * <code>0 <= {@link #position()} <= {@link #length()}</code>
686 * </pre>
687 * <p>
688 * In contrast to {@link InputStream}'s {@link #available()} method,
689 * this method returns the proper return type {@code long}.
690 * </p>
691 * @throws IOException if a buffer slice operation failed.
692 */
693 public final synchronized long remaining() throws IOException {
694 return 0 < refCount ? totalSize - position() : 0;
695 }
696
697 /**
698 * <i>See {@link #remaining()} for an accurate variant.</i>
699 * <p>
700 * {@inheritDoc}
701 * </p>
702 * @throws IOException if a buffer slice operation failed.
703 */
704 @Override
705 public final synchronized int available() throws IOException {
706 final long available = remaining();
707 return available <= Integer.MAX_VALUE ? (int)available : Integer.MAX_VALUE;
708 }
709
710 /**
711 * Returns the absolute position of the {@link InputStream}.
712 * <pre>
713 * <code>0 <= {@link #position()} <= {@link #length()}</code>
714 * </pre>
715 * @throws IOException if a buffer slice operation failed.
716 */
717 // @Override
718 public final synchronized long position() throws IOException {
719 if( 0 < refCount ) {
720 return ( (long)sliceIdx << sliceShift ) + currentSlice().position();
721 } else {
722 return 0;
723 }
724 }
725
726 /**
727 * Sets the absolute position of the {@link InputStream} to {@code newPosition}.
728 * <pre>
729 * <code>0 <= {@link #position()} <= {@link #length()}</code>
730 * </pre>
731 * @param newPosition The new position, which must be non-negative and &le; {@link #length()}.
732 * @return this instance
733 * @throws IOException if a buffer slice operation failed or stream is {@link #close() closed}.
734 */
735 // @Override
736 public final synchronized MappedByteBufferInputStream position( final long newPosition ) throws IOException {
737 checkOpen();
738 if ( totalSize < newPosition || 0 > newPosition ) {
739 throw new IllegalArgumentException("new position "+newPosition+" not within [0.."+totalSize+"]");
740 }
741 final int preSlice = sliceIdx;
742
743 if ( totalSize == newPosition ) {
744 // EOF, pos == maxPos + 1
745 sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
746 if( preSlice != sliceIdx ) {
747 flushSlice(preSlice, synchronous);
748 }
749 final ByteBuffer s = currentSlice();
750 s.position( s.capacity() );
751 } else {
752 sliceIdx = (int)( newPosition >>> sliceShift );
753 if( preSlice != sliceIdx ) {
754 flushSlice(preSlice, synchronous);
755 }
756 currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) );
757 }
758 return this;
759 }
760 private final synchronized void position2( final long newPosition ) throws IOException {
761 if ( totalSize == newPosition ) {
762 // EOF, pos == maxPos + 1
763 sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
764 final ByteBuffer s = currentSlice();
765 s.position( s.capacity() );
766 } else {
767 sliceIdx = (int)( newPosition >>> sliceShift );
768 currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) );
769 }
770 }
771
772 @Override
773 public final boolean markSupported() {
774 return true;
775 }
776
777 /**
778 * {@inheritDoc}
779 * <p>
780 * <i>Parameter {@code readLimit} is not used in this implementation,
781 * since the whole file is memory mapped and no read limitation occurs.</i>
782 * </p>
783 */
784 @Override
785 public final synchronized void mark( final int readlimit ) {
786 if( 0 < refCount ) {
787 try {
788 mark = position();
789 } catch (final IOException e) {
790 throw new RuntimeException(e); // FIXME: oops
791 }
792 }
793 }
794
795 /**
796 * {@inheritDoc}
797 * @throws IOException if this stream has not been marked,
798 * a buffer slice operation failed or stream has been {@link #close() closed}.
799 */
800 @Override
801 public final synchronized void reset() throws IOException {
802 checkOpen();
803 if ( mark == -1 ) {
804 throw new IOException("mark not set");
805 }
806 position( mark );
807 }
808
809 /**
810 * {@inheritDoc}
811 * @throws IOException if a buffer slice operation failed or stream is {@link #close() closed}.
812 */
813 @Override
814 public final synchronized long skip( final long n ) throws IOException {
815 checkOpen();
816 if( 0 > n ) {
817 return 0;
818 }
819 final long pos = position();
820 final long rem = totalSize - pos; // remaining
821 final long s = Math.min( rem, n );
822 position( pos + s );
823 return s;
824 }
825
826 @Override
827 public final synchronized int read() throws IOException {
828 checkOpen();
829 ByteBuffer slice = currentSlice();
830 if ( !slice.hasRemaining() ) {
831 if ( null == ( slice = nextSlice() ) ) {
832 return -1;
833 }
834 }
835 return slice.get() & 0xFF;
836 }
837
838 @Override
839 public final synchronized int read( final byte[] b, final int off, final int len ) throws IOException {
840 checkOpen();
841 if (b == null) {
842 throw new NullPointerException();
843 } else if( off < 0 ||
844 len < 0 ||
845 off > b.length ||
846 off + len > b.length ||
847 off + len < 0
848 ) {
849 throw new IndexOutOfBoundsException("offset "+off+", length "+len+", b.length "+b.length);
850 } else if ( 0 == len ) {
851 return 0;
852 }
853 final long totalRem = remaining();
854 if ( 0 == totalRem ) {
855 return -1;
856 }
857 final int maxLen = (int)Math.min( totalRem, len );
858 int read = 0;
859 while( read < maxLen ) {
860 ByteBuffer slice = currentSlice();
861 int currRem = slice.remaining();
862 if ( 0 == currRem ) {
863 if ( null == ( slice = nextSlice() ) ) {
864 throw new InternalError("Unexpected EOT");
865 }
866 currRem = slice.remaining();
867 }
868 final int currLen = Math.min( maxLen - read, currRem );
869 slice.get( b, off + read, currLen );
870 read += currLen;
871 }
872 return maxLen;
873 }
874
875 /**
876 * Perform similar to {@link #read(byte[], int, int)}
877 * with {@link ByteBuffer} instead of byte array.
878 * @param b the {@link ByteBuffer} sink, data is written at current {@link ByteBuffer#position()}
879 * @param len the number of bytes to read
880 * @return the number of bytes read, -1 for EOS
881 * @throws IOException if a buffer slice operation failed or stream has been {@link #close() closed}.
882 */
883 // @Override
884 public final synchronized int read(final ByteBuffer b, final int len) throws IOException {
885 checkOpen();
886 if (b == null) {
887 throw new NullPointerException();
888 } else if (len < 0 || len > b.remaining()) {
889 throw new IndexOutOfBoundsException("length "+len+", b "+b);
890 } else if ( 0 == len ) {
891 return 0;
892 }
893 final long totalRem = remaining();
894 if ( 0 == totalRem ) {
895 return -1;
896 }
897 final int maxLen = (int)Math.min( totalRem, len );
898 int read = 0;
899 while( read < maxLen ) {
900 ByteBuffer slice = currentSlice();
901 int currRem = slice.remaining();
902 if ( 0 == currRem ) {
903 if ( null == ( slice = nextSlice() ) ) {
904 throw new InternalError("Unexpected EOT");
905 }
906 currRem = slice.remaining();
907 }
908 final int currLen = Math.min( maxLen - read, currRem );
909 if( slice.hasArray() && b.hasArray() ) {
910 System.arraycopy(slice.array(), slice.arrayOffset() + slice.position(),
911 b.array(), b.arrayOffset() + b.position(),
912 currLen);
913 slice.position( slice.position() + currLen );
914 b.position( b.position() + currLen );
915 } else if( currLen == currRem ) {
916 b.put(slice);
917 } else {
918 final int _limit = slice.limit();
919 slice.limit(currLen);
920 try {
921 b.put(slice);
922 } finally {
923 slice.limit(_limit);
924 }
925 }
926 read += currLen;
927 }
928 return maxLen;
929 }
930}
An InputStream implementation based on an underlying FileChannel's memory mapped ByteBuffer,...
final synchronized void flush(final boolean metaData)
Similar to OutputStream#flush(), synchronizes all mapped buffers from local storage via MappedByteBuf...
final synchronized void setFileResizeOp(final FileResizeOp fileResizeOp)
final synchronized long length()
Returns the total size in bytes of the InputStream.
final synchronized long position()
Returns the absolute position of the InputStream.
final synchronized int available()
See remaining() for an accurate variant.
final synchronized void setLength(final long newTotalSize)
Resize the underlying FileChannel's size and adjusting this instance via accordingly.
final synchronized void flushSlices()
Releases the mapped ByteBuffer slices.
final synchronized CacheMode getCacheMode()
Return the used CacheMode.
final synchronized long remaining()
Returns the number of remaining available bytes of the InputStream, i.e.
final synchronized ByteBuffer currentSlice()
Return the mapped ByteBuffer slice at the current position().
final synchronized ByteBuffer nextSlice()
Return the next mapped ByteBuffer slice from the current position(), implicitly setting position(long...
final synchronized int read(final byte[] b, final int off, final int len)
final synchronized void setSynchronous(final boolean s)
Enable or disable synchronous mode.
MappedByteBufferInputStream(final FileChannel fileChannel)
Creates a new instance using the given FileChannel, read-only mapping mode, CacheMode#FLUSH_PRE_HARD ...
final synchronized void notifyLengthChange(final long newTotalSize)
Notify this instance that the underlying FileChannel's size has been changed and adjusting this insta...
final synchronized MappedByteBufferOutputStream getOutputStream(final FileResizeOp fileResizeOp)
Returns a new MappedByteBufferOutputStream instance sharing all resources of this input stream,...
MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode, final int sliceShift)
Creates a new instance using the given FileChannel.
final synchronized int read(final ByteBuffer b, final int len)
Perform similar to read(byte[], int, int) with ByteBuffer instead of byte array.
static final int DEFAULT_SLICE_SHIFT
Default slice shift, i.e.
final synchronized MappedByteBufferInputStream position(final long newPosition)
Sets the absolute position of the InputStream to newPosition.
final synchronized void mark(final int readlimit)
MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode)
Creates a new instance using the given FileChannel, given mapping-mode, given cache-mode and the DEFA...
final synchronized boolean getSynchronous()
Return synchronous mode.
An OutputStream implementation based on an underlying FileChannel's memory mapped ByteBuffer.
FLUSH_PRE_HARD
Hard flush the previous lazily cached buffer slice when caching the next buffer slice,...
FLUSH_PRE_SOFT
Soft flush the previous lazily cached buffer slice when caching the next buffer slice,...
FLUSH_NONE
Keep all previous lazily cached buffer slices alive, useful for hopping readers, i....
File resize interface allowing a file to change its size, e.g.