Java多线程 异步模式之生产者消费者模式

之前学习过的同步模式是保护性暂停,存在一定的缺陷,我们当初举例是两个线程,线程A收信人,线程B送信人,使用一个信箱充当一个中间过渡的介质。在多任务的情况下,收信人存在多个,发信人就得存在多个,而且只有当送信者将信件送到指定的收信人手中,一个任务才算完成,这中间的操作是同步进行的,因此属于同步模式下的多线程任务。

弊端也很明显,实际情况下,送信者只需要把信件放入信箱通知收信人即可,而不需要等到收信人拿到信才算结束,因为送信人把信收到信箱通知收信人后还可以接着送下一份信件,这中间送信人的时间没有得到充分的利用。


啰嗦了一大段话,接着进入正题:我们可以使用消息队列的方式存储信件,队列是一种先进先出的数据结构。例如三个送信人t1,t2,t3将信件要送给t4,只需要把信件依次存入队列,再通知t4取件即可。

要点:

  • 与前面的保护性暂停的GuardedObject不同,不需要产生结果和消费结果一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅仅负责产生结果数据,不需要关心数据该如何处理,而消费者只专心处理结果数据
  • 消息队列的容量是有限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中的各种堵塞队列采用的就是这个模式
public class TestMessageQueue  { 

    public static void main(String[] args) throws InterruptedException { 

        //队列初始容量2
        MessageQueue queue = new MessageQueue(2);
        //生产者
        for (int i = 0; i < 3; i++) { 
            int temp = i;
            new Thread(()->{ 
                Message message = new Message(temp,"情书:" + temp);
                try { 
                    queue.put(message);
                    log.debug("送出" + temp + "封情书");
                } catch (InterruptedException e) { 
                    e.printStackTrace();
                }
            },"线程"+ i).start();
        }

        //消费者
        while(true){ 
            //每隔1秒去看一下
            Thread.sleep(1000);
            Message take = queue.take();
            log.debug("收到" + take.getMessage());
        }

    }
}

class Message{ 
    private int id;
    private Object message;

    public Message(int id, Object message) { 
        this.id = id;
        this.message = message;
    }

    public int getId() { 
        return id;
    }

    public Object getMessage() { 
        return message;
    }
}

@Slf4j(topic = "c.MessageQueue")
class MessageQueue{ 
    private LinkedList<Message> queue;
    private int capacity;

    public MessageQueue(int capacity) { 
        this.capacity = capacity;
        this.queue = new LinkedList<>();
    }

    public Message take() throws InterruptedException { 
        synchronized (queue){ 
            while(queue.size() == 0){ 
                log.debug("没货了 明儿再来");
                queue.wait();
            }
            Message message = queue.removeLast();
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) throws InterruptedException { 
        synchronized (queue){ 
            while(queue.size() == capacity){ 
                log.debug("库存满了 ..");
                queue.wait();
            }
            queue.addLast(message);
            queue.notifyAll();
        }

    }
}

主线程负责消费,输出:

送进行三封信,主线程每隔一秒取一次,取完3封信后,主线程进行wait()状态,除非有生产者再次送信。

学习资料:https://www.bilibili.com/video/BV16J411h7Rd?p=108&spm_id_from=pageDriver

页面下部广告