Skip to content

Java IO - 基础

字节流与字符流

从数据传输方式或者说是运输方式角度看,可以将 IO 类分为:

  • 字节流
  • 字符流

字节是个计算机看的,字符才是给人看的

字节流和字符流的区别

  • 字节流读取单个字节,字符流读取单个字符(一个字符根据编码的不同,对应的字节也不同,如 UTF-8 编码是 3 个字节,中文编码是 2 个字节。)
  • 字节流用来处理二进制文件(图片、MP3、视频文件),字符流用来处理文本文件(可以看做是特殊的二进制文件,使用了某种编码,人可以阅读)。

简而言之,字节是个计算机看的,字符才是给人看的。

字节转字符 Input/OutputStream Reader/Writer

编码就是把字符转换为字节,而解码是把字节重新组合成字符。

如果编码和解码过程使用不同的编码方式那么就出现了乱码。

  • GBK 编码中,中文字符占 2 个字节,英文字符占 1 个字节;
  • UTF-8 编码中,中文字符占 3 个字节,英文字符占 1 个字节;
  • UTF-16be 编码中,中文字符和英文字符都占 2 个字节。

Java IO 中的设计模式(装饰者模式)

装饰者模式

装饰者(Decorator)和具体组件(ConcreteComponent)都继承自组件(Component),具体组件的方法实现不需要依赖于其它对象,而装饰者组合了一个组件,这样它可以装饰其它装饰者或者具体组件。所谓装饰,就是把这个装饰者套在被装饰者之上,从而动态扩展被装饰者的功能。装饰者的方法有一部分是自己的,这属于它的功能,然后调用被装饰者的方法实现,从而也保留了被装饰者的功能。可以看到,具体组件应当是装饰层次的最低层,因为只有具体组件的方法实现不需要依赖于其它对象。

IO 装饰者模式

以 InputStream 为例,

  • InputStream 是抽象组件;
  • FileInputStream 是 InputStream 的子类,属于具体组件,提供了字节流的输入操作;
  • FilterInputStream 属于抽象装饰者,装饰者用于装饰组件,为组件提供额外的功能。例如 BufferedInputStream 为 FileInputStream 提供缓存的功能。

实例化一个具有缓存功能的字节流对象时,只需要在 FileInputStream 对象上再套一层 BufferedInputStream 对象即可。

java
    FileInputStream fileInputStream = new FileInputStream(filePath);
    BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);

DataInputStream 装饰者提供了对更多数据类型进行输入的操作,比如 int、double 等基本类型。

IO常见类的使用

Java 的 I/O 大概可以分成以下几类:

  • 磁盘操作: File
  • 字节操作: InputStream 和 OutputStream
  • 字符操作: Reader 和 Writer
  • 对象操作: Serializable
  • 网络操作: Socket

File相关

File 类可以用于表示文件和目录的信息,但是它不表示文件的内容。

递归地列出一个目录下所有文件:

    public static void listAllFiles(File dir) {
        if (dir == null || !dir.exists()) {
            return;
        }
        if (dir.isFile()) {
            System.out.println(dir.getName());
            return;
        }
        for (File file : dir.listFiles()) {
            listAllFiles(file);
        }
    }

字节流相关

    public static void copyFile(String src, String dist) throws IOException {
    
        FileInputStream in = new FileInputStream(src);
        FileOutputStream out = new FileOutputStream(dist);
        byte[] buffer = new byte[20 * 1024];
    
        // read() 最多读取 buffer.length 个字节
        // 返回的是实际读取的个数
        // 返回 -1 的时候表示读到 eof,即文件尾
        while (in.read(buffer, 0, buffer.length) != -1) {
            out.write(buffer);
        }
    
        in.close();
        out.close();
    }

实现逐行输出文本文件的内容

    public static void readFileContent(String filePath) throws IOException {
    
        FileReader fileReader = new FileReader(filePath);
        BufferedReader bufferedReader = new BufferedReader(fileReader);
    
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            System.out.println(line);
        }
    
        // 装饰者模式使得 BufferedReader 组合了一个 Reader 对象
        // 在调用 BufferedReader 的 close() 方法时会去调用 Reader 的 close() 方法
        // 因此只要一个 close() 调用即可
        bufferedReader.close();
    }

序列化 & Serializable & transient

序列化就是将一个对象转换成字节序列,方便存储和传输。

  • 序列化: ObjectOutputStream.writeObject()
  • 反序列化: ObjectInputStream.readObject()

不会对静态变量进行序列化,因为序列化只是保存对象的状态,静态变量属于类的状态。

Serializable

序列化的类需要实现 Serializable 接口,它只是一个标准,没有任何方法需要实现,但是如果不去实现它的话而进行序列化,会抛出异常。

java
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        A a1 = new A(123, "abc");
        String objectFile = "file/a1";
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(objectFile));
        objectOutputStream.writeObject(a1);
        objectOutputStream.close();
    
        ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(objectFile));
        A a2 = (A) objectInputStream.readObject();
        objectInputStream.close();
        System.out.println(a2);
    }
    
    private static class A implements Serializable {
        private int x;
        private String y;
    
        A(int x, String y) {
            this.x = x;
            this.y = y;
        }
    
        @Override
        public String toString() {
            return "x = " + x + " " + "y = " + y;
        }
    }

transient

transient 关键字可以使一些属性不会被序列化。

ArrayList 中存储数据的数组 elementData 是用 transient 修饰的,因为这个数组是动态扩展的,并不是所有的空间都被使用,因此就不需要所有的内容都被序列化。通过重写序列化和反序列化方法,使得可以只序列化数组中有内容的那部分数据。

    private transient Object[] elementData;

Java 中的网络支持:

  • InetAddress: 用于表示网络上的硬件资源,即 IP 地址;
  • URL: 统一资源定位符;
  • Sockets: 使用 TCP 协议实现网络通信;
  • Datagram: 使用 UDP 协议实现网络通信。

InetAddress

没有公有的构造函数,只能通过静态方法来创建实例。

java
    InetAddress.getByName(String host);
    InetAddress.getByAddress(byte[] address);

URL

可以直接从 URL 中读取字节流数据。

java
    public static void main(String[] args) throws IOException {
    
        URL url = new URL("http://www.baidu.com");
    
        /* 字节流 */
        InputStream is = url.openStream();
    
        /* 字符流 */
        InputStreamReader isr = new InputStreamReader(is, "utf-8");
    
        /* 提供缓存功能 */
        BufferedReader br = new BufferedReader(isr);
    
        String line;
        while ((line = br.readLine()) != null) {
            System.out.println(line);
        }
    
        br.close();
    }

Sockets

  • ServerSocket: 服务器端类
  • Socket: 客户端类
  • 服务器和客户端通过 InputStream 和 OutputStream 进行输入输出。

![image][]

Datagram

  • DatagramSocket: 通信类
  • DatagramPacket: 数据包类

Java IO - 源码: InputStream

InputStream 抽象类

InputStream 类

java
    public abstract int read() 
    // 读取数据
    
    public int read(byte b[]) 
    // 将读取到的数据放在 byte 数组中,该方法实际上是根据下面的方法实现的,off 为 0,len 为数组的长度
    
    public int read(byte b[], int off, int len) 
    // 从第 off 位置读取 len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的
    
    public long skip(long n) 
    // 跳过指定个数的字节不读取,想想看电影跳过片头片尾
    
    public int available() 
    // 返回可读的字节数量
    
    public void close() 
    // 读取完,关闭流,释放资源
    
    public synchronized void mark(int readlimit) 
    // 标记读取位置,下次还可以从这里开始读取,使用前要看当前流是否支持,可以使用 markSupport() 方法判断
    
    public synchronized void reset() 
    // 重置读取位置为上次 mark 标记的位置
    
    public boolean markSupported() 
    // 判断当前流是否支持标记流,和上面两个方法配套使用

源码实现

InputStream

java
    public abstract class InputStream implements Closeable {
        private static final int SKIP_BUFFER_SIZE = 2048;  //用于skip方法,和skipBuffer相关
        private static byte[] skipBuffer;    // skipBuffer is initialized in skip(long), if needed.
        
        //从输入流中读取下一个字节,
        //正常返回0-255,到达文件的末尾返回-1
        //在流中还有数据,但是没有读到时该方法会阻塞(block)
        //Java IO和New IO的区别就是阻塞流和非阻塞流
        //抽象方法!不同的子类不同的实现!
        public abstract int read() throws IOException;  
        
        //将流中的数据读入放在byte数组的第off个位置先后的len个位置中
        //返回值为放入字节的个数。
        //这个方法在利用抽象方法read,某种意义上简单的Templete模式。
        public int read(byte b[], int off, int len) throws IOException {
            //检查输入是否正常。一般情况下,检查输入是方法设计的第一步
            if (b == null) {    
                throw new NullPointerException();
            } else if (off < 0 || len < 0 || len > b.length - off) {
                 throw new IndexOutOfBoundsException();
            } else if (len == 0) {
                 return 0;
            }        
            //读取下一个字节
            int c = read();
            //到达文件的末端返回-1
            if (c == -1) {    return -1;   }
            //返回的字节downcast 
            b[off] = (byte)c;
            //已经读取了一个字节 
            int i = 1;                                                                        
            try {
                //最多读取len个字节,所以要循环len次
                for (; i < len ; i++) {
                    //每次循环从流中读取一个字节
                    //由于read方法阻塞,
                    //所以read(byte[],int,int)也会阻塞
                    c = read();
                    //到达末尾,理所当然返回-1 
                    if (c == -1) {            break;           } 
                    //读到就放入byte数组中
                    b[off + i] = (byte)c;
                }
            } catch (IOException ee) {     }
            return i;
        }
    
         //利用上面的方法read(byte[] b)
        public int read(byte b[]) throws IOException {
            return read(b, 0, b.length);
         }                          
        //方法内部使用的、表示要跳过的字节数目,
         public long skip(long n) throws IOException {
            long remaining = n;    
            int nr;
            if (skipBuffer == null)
            //初始化一个跳转的缓存
            skipBuffer = new byte[SKIP_BUFFER_SIZE];                   
            //本地化的跳转缓存
            byte[] localSkipBuffer = skipBuffer;          
            //检查输入参数,应该放在方法的开始 
            if (n <= 0) {    return 0;      }                             
            //一共要跳过n个,每次跳过部分,循环 
            while (remaining > 0) {                                      
                nr = read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining));
                //利用上面的read(byte[],int,int)方法尽量读取n个字节 
                //读到流的末端,则返回
                if (nr < 0) {  break;    }
                //没有完全读到需要的,则继续循环
                remaining -= nr;                                       
            }       
            return n - remaining;//返回时要么全部读完,要么因为到达文件末端,读取了部分
        }
        //查询流中还有多少可以读取的字节
        //该方法不会block。在java中抽象类方法的实现一般有以下几种方式: 
        //1.抛出异常(java.util);2.“弱”实现。像上面这种。子类在必要的时候覆盖它。
        //3.“空”实现。
        public int available() throws IOException {           
            return 0;
        }
        //关闭当前流、同时释放与此流相关的资源
        //关闭当前流、同时释放与此流相关的资源
        public void close() throws IOException {}
        //markSupport可以查询当前流是否支持mark
        public synchronized void mark(int readlimit) {}
        //对mark过的流进行复位。只有当流支持mark时才可以使用此方法。
        public synchronized void reset() throws IOException {
    
                       throw new IOException("mark/reset not supported");
    
    }
        //查询是否支持mark
        //绝大部分不支持,因此提供默认实现,返回false。子类有需要可以覆盖。
        public boolean markSupported() {           
            return false;
        }
    }

FilterInputStream

java
    public class FilterInputStream extends InputStream {
        //装饰器的代码特征: 被装饰的对象一般是装饰器的成员变量
        protected volatile InputStream in; //将要被装饰的字节输入流
        protected FilterInputStream(InputStream in) {   //通过构造方法传入此被装饰的流
            this.in = in;
         }
        //下面这些方法,完成最小的装饰――0装饰,只是调用被装饰流的方法而已
        public int read() throws IOException {
            return in.read();
        }
        public int read(byte b[]) throws IOException {
            return read(b, 0, b.length);
         }
        public int read(byte b[], int off, int len) throws IOException {
            return in.read(b, off, len);
         }
        public long skip(long n) throws IOException {
            return in.skip(n);
        }
        public int available() throws IOException {
            return in.available();
        }
        public void close() throws IOException {
            in.close();
        }
        public synchronized void mark(int readlimit) {
            in.mark(readlimit);
         }
        public synchronized void reset() throws IOException {
            in.reset();
        }
        public boolean markSupported() {
            return in.markSupported();
        }
    }

ByteArrayInputStream

java
    public class ByteArrayInputStream extends InputStream {
        protected byte buf[];                //内部的buffer,一般通过构造器输入
        protected int pos;                   //当前位置的cursor。从0至byte数组的长度。
        //byte[pos]就是read方法读取的字节
        protected int mark = 0;           //mark的位置。
        protected int count;              //流中字节的数目。
    
        //构造器,从一个byte[]创建一个ByteArrayInputStream
         public ByteArrayInputStream(byte buf[]) {
            //初始化流中的各个成员变量
            this.buf = buf;              
            this.pos = 0;
            this.count = buf.length;
         }
        //构造器
         public ByteArrayInputStream(byte buf[], int offset, int length) {                
            this.buf = buf;
            this.pos = offset; //与上面不同
            this.count = Math.min(offset + length, buf.length);
            this.mark = offset; //与上面不同
        }
        //从流中读取下一个字节
         public synchronized int read() {           
            //返回下一个位置的字节//流中没有数据则返回-1
            return (pos < count) ? (buf[pos++] & 0xff) : -1; 
        }
        // ByteArrayInputStream要覆盖InputStream中可以看出其提供了该方法的实现
        //某些时候,父类不能完全实现子类的功能,父类的实现一般比较通用。
        //当子类有更有效的方法时,我们会覆盖这些方法。
        public synchronized int read(byte b[], int off, int len) {
            //首先检查输入参数的状态是否正确
            if(b==null){ 
                throw new NullPointerException();
            } else if (off < 0 || len < 0 || len > b.length - off) {
                throw new IndexOutOfBoundsException();
            }
            if (pos >= count) {             return -1;             }
            if (pos + len > count) {      len = count - pos;         }
            if (len <= 0) {           return 0;     }
            //java中提供数据复制的方法
            //出于速度的原因!他们都用到System.arraycopy方法
            System.arraycopy(buf, pos, b, off, len); 
            pos += len;
            return len;
        }
        //下面这个方法,在InputStream中也已经实现了。
        //但是当时是通过将字节读入一个buffer中实现的,好像效率低了一点。
        //比InputStream中的方法简单、高效
        public synchronized long skip(long n) {
            //当前位置,可以跳跃的字节数目
            if (pos + n > count) {    n = count - pos;       }       
            //小于0,则不可以跳跃
             if (n < 0) {       return 0;     }   
            //跳跃后,当前位置变化 
            pos += n;                                                                              
            return n;
        }   
        //查询流中还有多少字节没有读取。 
    public synchronized int available() {
        return count - pos;
        }
        //ByteArrayInputStream支持mark所以返回true
        public boolean markSupported() {                   
    
                       return true;
    
    }  
        //在流中当前位置mark。 
        public void mark(int readAheadLimit) {            
            mark = pos;
         }
        //重置流。即回到mark的位置。
        public synchronized void reset() {
            pos = mark;
        }
        //关闭ByteArrayInputStream不会产生任何动作。
        public void close() throws IOException {   }
    }

BufferedInputStream

java
    public class BufferedInputStream extends FilterInputStream {
        private static int defaultBufferSize = 8192;    //默认缓存的大小
        protected volatile byte buf[];  //内部的缓存
        protected int count;     //buffer的大小
        protected int pos;      //buffer中cursor的位置
        protected int markpos = -1;    //mark的位置
        protected int marklimit;     //mark的范围
        
        //原子性更新。和一致性编程相关
        private static final    
        AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
        AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class,"buf");
        //检查输入流是否关闭,同时返回被包装流
         private InputStream getInIfOpen() throws IOException {
            InputStream input = in;
            if (input == null)    throw new IOException("Stream closed");
            return input;
        }
        //检查buffer的状态,同时返回缓存
        private byte[] getBufIfOpen() throws IOException {                       
            byte[] buffer = buf;
            //不太可能发生的状态
            if (buffer == null)   throw new IOException("Stream closed");    
            return buffer;
        }
    
        //构造器
    public BufferedInputStream(InputStream in) {
        //指定默认长度的buffer
            this(in, defaultBufferSize);                                                              
        }
        //构造器
        public BufferedInputStream(InputStream in, int size) {                           
            super(in);
            //检查输入参数
            if(size<=0){
                throw new IllegalArgumentException("Buffer size <= 0");
            }
            //创建指定长度的buffer
            buf = new byte[size]; 
        }
        //从流中读取数据,填充如缓存中。
        private void fill() throws IOException {
            //得到buffer
            byte[] buffer = getBufIfOpen();                            
                if (markpos < 0)
                //mark位置小于0,此时pos为0
                pos = 0;
                //pos大于buffer的长度 
                else if (pos >= buffer.length) 
                if (markpos > 0) {        
                    int sz = pos - markpos;                                                           
              System.arraycopy(buffer, markpos, buffer, 0, sz);
                    pos = sz;
                    markpos = 0;
                } else if (buffer.length >= marklimit) {                 
                    //buffer的长度大于marklimit时,mark失效
                    markpos = -1; 
                    //丢弃buffer中的内容 
              pos = 0;                                    
                }else{                                                                         
                    //buffer的长度小于marklimit时对buffer扩容
                    int nsz = pos * 2;
                    if (nsz > marklimit) 
                        nsz = marklimit;//扩容为原来的2倍,太大则为marklimit大小
                        byte nbuf[] = new byte[nsz];                   
                        //将buffer中的字节拷贝如扩容后的buf中 
                        System.arraycopy(buffer, 0, nbuf, 0, pos);        
                        if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {    
                        //在buffer在被操作时,不能取代此buffer
                        throw new IOException("Stream closed");
                    }
                    //将新buf赋值给buffer
                    buffer = nbuf;                                                               
                }
                count = pos;
                int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
                if (n > 0)     count = n + pos;
            }
            //读取下一个字节
            public synchronized int read() throws IOException { 
            //到达buffer的末端 
            if (pos >= count) {                                                                 
                //就从流中读取数据,填充buffer 
                fill();                                                                                
           //读过一次,没有数据则返回-1
                if (pos >= count)  return -1;                                
            }
            //返回buffer中下一个位置的字节
            return getBufIfOpen()[pos++] & 0xff;                           
        }
        //将数据从流中读入buffer中
         private int read1(byte[] b, int off, int len) throws IOException {                 
            int avail = count - pos; //buffer中还剩的可读字符 
            //buffer中没有可以读取的数据时
            if(avail<=0){                                                                                        
                //将输入流中的字节读入b中
                if (len >= getBufIfOpen().length && markpos < 0) {             
                    return getInIfOpen().read(b, off, len);
                }
                fill();//填充 
                avail = count - pos;
                if (avail <= 0) return -1;
            }
            //从流中读取后,检查可以读取的数目
            int cnt = (avail < len) ? avail : len;                                                  
            //将当前buffer中的字节放入b的末端
            System.arraycopy(getBufIfOpen(), pos, b, off, cnt);            
            pos += cnt;
            return cnt;
        }
    
        public synchronized int read(byte b[], int off, int len)throws IOException {
            getBufIfOpen();                                                                                     
         // 检查buffer是否open
            //检查输入参数是否正确
            if ((off | len | (off + len) | (b.length - (off + len))) < 0) {           
                throw new IndexOutOfBoundsException();
            } else if (len == 0) {
                return 0;
            }
            int n = 0;
            for (;;) {
                int nread = read1(b, off + n, len - n);
                if (nread <= 0)     return (n == 0) ? nread : n;
                n += nread;
                if (n >= len)     return n;
                InputStream input = in;
                if (input != null && input.available() <= 0)     return n;
            }
        }
    
        public synchronized long skip(long n) throws IOException {
            // 检查buffer是否关闭
            getBufIfOpen();                                        
            //检查输入参数是否正确
            if (n <= 0) {    return 0;      }                 
            //buffered中可以读取字节的数目
            long avail = count - pos;                    
            //可以读取的小于0,则从流中读取
            if (avail <= 0) {                                          
                //mark小于0,则mark在流中 
                if (markpos <0)  return getInIfOpen().skip(n);     
                // 从流中读取数据,填充缓冲区。
                fill();                                  
                //可以读的取字节为buffer的容量减当前位置
                avail = count - pos;                                   
                if (avail <= 0)     return 0;
            }       
            long skipped = (avail < n) ? avail : n;      
            pos += skipped;                                       
         //当前位置改变
            return skipped;
        }
        //该方法不会block!返回流中可以读取的字节的数目。
        //该方法的返回值为缓存中的可读字节数目加流中可读字节数目的和
        public synchronized int available() throws IOException {
            return getInIfOpen().available() + (count - pos);                 
        }
    
        //当前位置处为mark位置
        public synchronized void mark(int readlimit) {  
            marklimit = readlimit;
            markpos = pos;
        }
    
        public synchronized void reset() throws IOException {
            // 缓冲去关闭了,肯定就抛出异常!程序设计中经常的手段
            getBufIfOpen();
            if (markpos < 0)     throw new IOException("Resetting to invalid mark");
            pos = markpos;
        }
        //该流和ByteArrayInputStream一样都支持mark
        public boolean markSupported() {           
            return true;
        }
        //关闭当前流同时释放相应的系统资源。
        public void close() throws IOException {
            byte[] buffer;
            while ( (buffer = buf) != null) {
                if (bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if (input != null)    input.close();
                return;
            }
            // Else retry in case a new buf was CASed in fill()
            }
        }
    }

PipedInputStream

java
    public class PipedInputStream extends InputStream {
        //标识有读取方或写入方关闭
        boolean closedByWriter = false;                                                             
        volatile boolean closedByReader = false;    
        //是否建立连接
        boolean connected = false;                                                                     
        //标识哪个线程
        Thread readSide;                                                                                             
        Thread writeSide;
        //缓冲区的默认大小
        protected static final int PIPE_SIZE = 1024;                         
        //缓冲区
        protected byte buffer[] = new byte[PIPE_SIZE];                 
        //下一个写入字节的位置。0代表空,in==out代表满
        protected int in = -1;               
        //下一个读取字节的位置
    protected int out = 0;    
              
        //给定源的输入流
        public PipedInputStream(PipedOutputStream src) throws IOException {                
            connect(src);
        }
        //默认构造器,下部一定要connect源
        public PipedInputStream() {    }                                                
        //连接输入源
        public void connect(PipedOutputStream src) throws IOException {               
            //调用源的connect方法连接当前对象
            src.connect(this);                                                                           
        }
        //只被PipedOuputStream调用
        protected synchronized void receive(int b) throws IOException {                  
            //检查状态,写入
            checkStateForReceive();                                                                                 
            //永远是PipedOuputStream
            writeSide = Thread.currentThread();                                                      
            //输入和输出相等,等待空间
            if (in == out)     awaitSpace();                                                           
            if (in < 0) {
                in = 0;
                out = 0;
            }
            //放入buffer相应的位置
            buffer[in++] = (byte)(b & 0xFF);                                                             
            //in为0表示buffer已空
            if (in >= buffer.length) {      in = 0;         }                                             
        }
    
        synchronized void receive(byte b[], int off, int len)  throws IOException {
            checkStateForReceive();
            //从PipedOutputStream可以看出
            writeSide = Thread.currentThread();                                   
            int bytesToTransfer = len;
            while (bytesToTransfer > 0) {
                //满了,会通知读取的;空会通知写入
                if (in == out)    awaitSpace();                                
                int nextTransferAmount = 0;
                if (out < in) {
                    nextTransferAmount = buffer.length - in;
                } else if (in < out) {
                    if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)  nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {     in = 0;      }
            }
        }
        //检查当前状态,等待输入
        private void checkStateForReceive() throws IOException {                           
            if (!connected) {
                throw new IOException("Pipe not connected");
            } else if (closedByWriter || closedByReader) {
                throw new IOException("Pipe closed");
            } else if (readSide != null && !readSide.isAlive()) {
                throw new IOException("Read end dead");
            }
        }
        
        //Buffer已满,等待一段时间
        private void awaitSpace() throws IOException {                                              
            //in==out表示满了,没有空间
            while (in == out) {                                                                                             
                //检查接受端的状态
                checkStateForReceive();                                                                       
                //通知读取端
                notifyAll();                                                                                  
                try {
                    wait(1000);
                } catch (InterruptedException ex) {
                    throw new java.io.InterruptedIOException();
                }
            }
        }
    
        //通知所有等待的线程()已经接受到最后的字节
        synchronized void receivedLast() {                  
            closedByWriter = true;                             //
            notifyAll();
        }
    
        public synchronized int read()  throws IOException {
            //检查一些内部状态
            if (!connected) {                                                                              
                throw new IOException("Pipe not connected");
            } else if (closedByReader) {
                throw new IOException("Pipe closed");
            } else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {
                throw new IOException("Write end dead");
            }
            //当前线程读取
            readSide = Thread.currentThread();                                            
            //重复两次? ? ? 
              int trials = 2;                                                                               
            while (in < 0) {
            //输入断关闭返回-1
            if (closedByWriter) {              return -1;        }                 
                //状态错误
                if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {         
                    throw new IOException("Pipe broken");
                }
                notifyAll();        // 空了,通知写入端可以写入 try {
                    wait(1000);
                } catch (InterruptedException ex) {
                    throw new java.io.InterruptedIOException();
                }
            }
            int ret = buffer[out++] & 0xFF;                                                                         if (out >= buffer.length) {             out = 0;                }
            //没有任何字节
            if (in == out) {           in = -1;                 }                             
            return ret;
        }
    
        public synchronized int read(byte b[], int off, int len)  throws IOException {
            //检查输入参数的正确性
             if (b == null) {                                                                                 
                throw new NullPointerException();
            } else if (off < 0 || len < 0 || len > b.length - off) {
                throw new IndexOutOfBoundsException();
            } else if (len == 0) {
                return 0;
            }
            //读取下一个
             int c = read();                                                                                 
            //已经到达末尾了,返回-1
            if (c < 0) {    return -1;       }                                            
            //放入外部buffer中
            b[off] = (byte) c;                                                                    
            //return-len
            int rlen = 1;                                                                            
            //下一个in存在,且没有到达len
            while ((in >= 0) && (--len > 0)) {                                          
                //依次放入外部buffer
                b[off + rlen] = buffer[out++];                                         
                rlen++;
                //读到buffer的末尾,返回头部
                if (out >= buffer.length) {         out = 0;           }        
                //读、写位置一致时,表示没有数据
                if (in == out) {     in = -1;      }               
            }
            //返回填充的长度
            return rlen;                                                                            
        }
        //返回还有多少字节可以读取
        public synchronized int available() throws IOException {             
            //到达末端,没有字节
            if(in < 0)
                return 0;                                                                                         
            else if(in == out)
                //写入的和读出的一致,表示满
                return buffer.length;                                                               
            else if (in > out)
                //写入的大于读出
                return in - out;                                                                                 
            else
                //写入的小于读出的
                return in + buffer.length - out;                                                
        }
        //关闭当前流,同时释放与其相关的资源
        public void close()  throws IOException {                
            //表示由输入流关闭
            closedByReader = true;                                             
            //同步化当前对象,in为-1
            synchronized (this) {     in = -1;    }        
        }
            
    }

Java IO - 源码: OutputStream

OutputStream 抽象类

OutputStream 类

java
    public abstract void write(int b)
    // 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 位,只有低 位才写入,高 位将舍弃。
    
    public void write(byte b[])
    // 将数组中的所有字节写入,和上面对应的 read() 方法类似,实际调用的也是下面的方法。
    
    public void write(byte b[], int off, int len)
    // 将 byte 数组从 off 位置开始,len 长度的字节写入
    
    public void flush()
    // 强制刷新,将缓冲中的数据写入
    
    public void close()
    // 关闭输出流,流被关闭后就不能再输出数据了

略...