001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024 025import org.apache.commons.compress.compressors.CompressorOutputStream; 026import org.apache.commons.compress.compressors.lz77support.Parameters; 027import org.apache.commons.compress.utils.ByteUtils; 028 029/** 030 * CompressorOutputStream for the framing Snappy format. 031 * 032 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 033 * 034 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 035 * @since 1.14 036 * @NotThreadSafe 037 */ 038public class FramedSnappyCompressorOutputStream extends CompressorOutputStream { 039 // see spec: 040 // > However, we place an additional restriction that the uncompressed data 041 // > in a chunk must be no longer than 65536 bytes. This allows consumers to 042 // > easily use small fixed-size buffers. 043 private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16; 044 045 private final OutputStream out; 046 private final Parameters params; 047 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 048 // used in one-arg write method 049 private final byte[] oneByte = new byte[1]; 050 private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE]; 051 private int currentIndex = 0; 052 053 private final ByteUtils.ByteConsumer consumer; 054 055 /** 056 * Constructs a new output stream that compresses 057 * snappy-framed-compressed data to the specified output stream. 058 * @param out the OutputStream to which to write the compressed data 059 * @throws IOException if writing the signature fails 060 */ 061 public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException { 062 this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE) 063 .build()); 064 } 065 066 /** 067 * Constructs a new output stream that compresses 068 * snappy-framed-compressed data to the specified output stream. 069 * @param out the OutputStream to which to write the compressed data 070 * @param params parameters used to fine-tune compression, in 071 * particular to balance compression ratio vs compression speed. 072 * @throws IOException if writing the signature fails 073 */ 074 public FramedSnappyCompressorOutputStream(final OutputStream out, Parameters params) throws IOException { 075 this.out = out; 076 this.params = params; 077 consumer = new ByteUtils.OutputStreamByteConsumer(out); 078 out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE); 079 } 080 081 @Override 082 public void write(int b) throws IOException { 083 oneByte[0] = (byte) (b & 0xff); 084 write(oneByte); 085 } 086 087 @Override 088 public void write(byte[] data, int off, int len) throws IOException { 089 if (currentIndex + len > MAX_COMPRESSED_BUFFER_SIZE) { 090 flushBuffer(); 091 while (len > MAX_COMPRESSED_BUFFER_SIZE) { 092 System.arraycopy(data, off, buffer, 0, MAX_COMPRESSED_BUFFER_SIZE); 093 off += MAX_COMPRESSED_BUFFER_SIZE; 094 len -= MAX_COMPRESSED_BUFFER_SIZE; 095 currentIndex = MAX_COMPRESSED_BUFFER_SIZE; 096 flushBuffer(); 097 } 098 } 099 System.arraycopy(data, off, buffer, currentIndex, len); 100 currentIndex += len; 101 } 102 103 @Override 104 public void close() throws IOException { 105 finish(); 106 out.close(); 107 } 108 109 /** 110 * Compresses all remaining data and writes it to the stream, 111 * doesn't close the underlying stream. 112 * @throws IOException if an error occurs 113 */ 114 public void finish() throws IOException { 115 if (currentIndex > 0) { 116 flushBuffer(); 117 } 118 } 119 120 private void flushBuffer() throws IOException { 121 out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE); 122 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 123 try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) { 124 o.write(buffer, 0, currentIndex); 125 } 126 byte[] b = baos.toByteArray(); 127 writeLittleEndian(3, b.length + 4l /* CRC */); 128 writeCrc(); 129 out.write(b); 130 currentIndex = 0; 131 } 132 133 private void writeLittleEndian(final int numBytes, long num) throws IOException { 134 ByteUtils.toLittleEndian(consumer, num, numBytes); 135 } 136 137 private void writeCrc() throws IOException { 138 checksum.update(buffer, 0, currentIndex); 139 writeLittleEndian(4, mask(checksum.getValue())); 140 checksum.reset(); 141 } 142 143 static long mask(long x) { 144 // ugly, maybe we should just have used ints and deal with the 145 // overflow 146 x = ((x >> 15) | (x << 17)); 147 x += FramedSnappyCompressorInputStream.MASK_OFFSET; 148 x &= 0xffffFFFFL; 149 return x; 150 } 151}