LOADING...

加载过慢请开启缓存(浏览器默认开启)

loading

多线程与分布式

2024/4/4 后端

多线程与分布式 [周介绍]

  • 线程池的基本使用、特点、注意点
  • ThreadLoacal的基本使用、原理和注意事项
  • 分布式基础、核心概念
  • docker的下载、安装和基本命令
  • 独立制作docker容器
  • Nginx的安装、基本使用和使用命令
  • 使用Nginx搭建文件服务
  • 消息队列RabbitMQ的核心概念queue、message和exchange
  • RabbitMQ的四种交换机模式
  • SpringBoot整合RabbitMQ案例

线程池 —— 治理线程的法宝

  • 线程池的自我介绍
  • 创建和停止线程池
  • 常见线程池的特点和用法
  • 任务太多,怎么拒绝
  • 钩子方法,给线程池加点料
  • 实现原理、源码分析
  • 使用线程池的注意点
线程池的自我介绍
  • 线程池的重要性 [可以复用我们的线程]
  • 什么是”池” – 软件中的”池”,可以理解为计划经济
  • 如果不适用线程池,每个任务都新开一个线程处理
    • 一个线程
    • for循环创建线程
    • 当任务数量上升到1000
      这样开销太大,我们希望有固定数量的线程,来执行这1000个线程,这样就避免了反复创建并销毁线程所带来的开销问题[多了会报错 内存不足异常]
threadpool/ForLoop.java
package threadpool;
/**
 * 描述:     TODO
 */
public class ForLoop {

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Task());
            thread.start();
        }
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }
}
为什么要使用线程池
  • 问题一:反复创建线程开销大
  • 问题二:过多的线程会占用太多内存
  • 解决以上两个问题的思路
    • 用少量的线程——避免内存占用过多
    • 让这部分线程都保持工作,且可以反复执行任务——避免生命周期的损耗
线程池的好处
  • 加快响应速度
  • 合理利用CPU和内存
  • 统一管理
线程池使用应用的场合
  • 服务器(Tomcat)接收到大量请求时,使用线程池技术是非常合适的,它可以大大减少现成的创建和销毁次数,提高服务器的工作效率
  • 实际上,在开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理

线程增减的时机

创建和停止线程池
  • 线程池构造方法的参数
  • 线程池应该手动创建还是自动创建
  • 线程池里的线程数量设定为多少比较合适
  • 停止线程池的方法
线程池构造方法的参数
参数名 类型 含义
corePoolSize int 核心线程数
maxPoolSize int 最大线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务存储队列
threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程
Handler RejectedExecutionHandler 由于线程池无法接收你所提交的任务的拒绝策略
参数中的corePoolSize和maxPoolSize
  • corePoolSize指的是核心线程数
    线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新的线程去执行任务

  • 最大量maxPoolSize
    在核心线程数的基础上,格外增加的线程数的上限

添加线程规则
  1. 如果线程数小于corePoolSize,创建一个新线程来运行新任务
  2. 如果线程数等于(或大于) corePoolSize但少于maximumPoolSize,则将任务放入队列
  3. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程
  4. 如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝

  • 是否需要增加线程的判断顺序是:
    • corePoolSize
    • workQueue
    • maxPoolSize
举个例子
  • 线程池:核心池大小为5,最大池大小为10,队列为100
  • 因为线程中的请求最多会创建5个,然后任务将被添加到队列中,直到达到100。当队列已满时,将创建新的线程maxPoolSize,最多到10个线程,如果再来任务,就拒绝
增减线程的特点
  • 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池
  • 线程池希望保持较小的线程数,并且只有在负载变得很大时才增加它
  • 通过设置maximumPoolSize为很高的值,可以允许线程池容纳任意数量的并发任务
  • 只有在队列填满时才创建多于corePoolSize的线程,如果使用的是无界队列,那么线程数就不会超过corePoolSize

线程存活时间和工作队列

keepAliveTime
  • 如果线程池当前的线程多余corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止
ThreadFactory 用来创建线程
  • 默认使用Executors.defaultThreadFactory()
  • 创建出来的线程都在同一个线程组
  • 如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等
工作队列
  • 有三种最常见的队列类型
    • 直接交接:SynchronousQueue
    • 无界队列:LinkedBlockingQueue
    • 有界队列:ArrayBlockingQueue

自动创建线程池的风险

线程池应该手动创建还是自动创建
  • 手动创建更好,因为这样可以更加明确线程池的运行规则,避免资源耗尽的风险
    自动创建线程池(即直接调用JDK封装好的构造方法) 可能会带来哪些问题?
  • newFixedThreadPool
    • 容易造成大量内存占用,可能会导致OOM
threadpool/FixedThreadPoolThread.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolThread {
    public static void main(String[] args) { //核心线程数量
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable{

    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName());
    }
}
=======================================================================
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
    
pool-1-thread-3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-2
因为核心线程数量只规定了4个 这有这四个线程跑程序
threadpool/FixedThreadPoolOOM.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 演示nexFixedThreadPool出错的情况
 */
public class FixedThreadPoolOOM {
    private static ExecutorService executorService = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++){
            executorService.execute(new SubThread());
        }
    }
}
class SubThread implements Runnable{

    @Override
    public void run() {
        try{//一直睡觉 不让它结束
            Thread.sleep(1000000000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
=======================================================================
报错:java.lang.OutOfMemoryError:GC overhead limilt exceeded
  • newSingleThreadExecutor [单独的线程]
    • 当请求堆积的时候,可能会占用大量的内存
threadpool/SingleThreadExecutor.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
=======================================================================
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
  • CachedThreadPool [可缓存线程池]
    • 特点:具有自动回收多余线程的功能
    • 弊端在于第二个参数maximumPoolSize被设置为了Integer.MAX_VALUE, 这可能会创建数量非常多的线程,甚至导致OOM
threadpool/CachedThreadPool.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
  • newScheduledThreadPool [跟时间相关的]
threadpool/ScheduledThreadPool.java
package threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
        threadPool.schedule(new Task(), 5, TimeUnit.SECONDS);
        threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);//每隔3s运行
    }
}
  • 正确的创建线程池的方法
    • 根据不同的业务场景,设置线程池参数
    • 比如:内存有多大,给线程取什么名字等等

线程池里的线程数量设定为多少比较合适?

  • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右
  • 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍
    参考Brain Goetz推荐的计算方法:
    线程数 = CPU核心数 × (1 + 平均等待时间 / 平均工作时间)

对比各种线程池的特点

常见的线程池的特点

以上4种线程池的构造方法的参数
Parameter FixedThreadPool CachedThreadPool ScheduledThreadPool SingleThreaded
corePoolSize constructor-arg 0 constructor-arg 1
maxPoolSize same as corePoolSize Integer.MAX_VALUE Integer.MAX_VALUE 1
keepAliveTime 0 seconds 60 seconds 0 seconds 0 seconds

阻塞队列分析

  • FixedThreadPool和SingleThreadExecutor的Queue是LinedBlockingQueue
  • CachedThreadPool使用的是Queue是SynchronousQueue
  • ScheduledThreadPool使用延迟队列DelayedWorkQueue
workStealingPool是JDK1.8加入的
  • 这个线程池和之前的都有很大不同
  • 子任务
  • 窃取 [并行执行]

如何正确关闭线程池

停止线程池的正确方法
  • shutdown [再给就拒绝 新的任务不会增加了]
threadpool/Shutdown.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 演示关闭线程池
 */
public class Shutdown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务往线程池中提交
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();//绅士的暂停 再去提交的任务就不会增加了
        executorService.execute(new ShutDownTask());
    }
}
class ShutDownTask implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

=================================================================================
pool-1-thread-4
pool-1-thread-9
pool-1-thread-10
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task threadpool.ShutDownTask@61bbe9ba rejected from java.util.concurrent.ThreadPoolExecutor@610455d6[Shutting down, pool size = 10, active threads = 10, queued tasks = 970, completed tasks = 20]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at threadpool.Shutdown.main(Shutdown.java:18)
pool-1-thread-8
pool-1-thread-1
pool-1-thread-3
  • isShutdown [判断是否进入停止状态]
threadpool/Shutdown.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 演示关闭线程池
 */
public class Shutdown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务往线程池中提交
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        System.out.println(executorService.isShutdown());//false
        executorService.shutdown();//绅士的暂停 再去提交的任务就不会增加了
        System.out.println(executorService.isShutdown());//true 已经结束了
    }
}
class ShutDownTask implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
  • isTerminated [线程停止返回 整个程序执行完毕]
threadpool/Shutdown.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 演示关闭线程池
 */
public class Shutdown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务往线程池中提交
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        System.out.println(executorService.isShutdown());//false
        executorService.shutdown();//绅士的暂停 再去提交的任务就不会增加了
        System.out.println(executorService.isShutdown());//true 已经结束了
        System.out.println(executorService.isTerminated());
        Thread.sleep(10000);
        System.out.println(executorService.isTerminated());
    }
}
class ShutDownTask implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
==========================================================================
pool-1-thread-5
pool-1-thread-10
false
true
false
pool-1-thread-2
pool-1-thread-6
pool-1-thread-6
pool-1-thread-2
true
  • awaitTermination [测试一段时间内线程会不会完全停止的方法 等待的时间进程被打乱了 等待的时间到了]
threadpool/Shutdown.java
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 演示关闭线程池
 */
public class Shutdown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务往线程池中提交
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500); //七秒钟之内是否完全运行完毕了
        executorService.shutdown();
        boolean b = executorService.awaitTermination(7L, TimeUnit.SECONDS);
        System.out.println(b);
    }
}
class ShutDownTask implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
  • shutdownNow [立刻关闭线程池]
///正在执行的和不要关闭 正在等待的内容直接返回 如何优雅编写?
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 演示关闭线程池
 */
public class Shutdown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务往线程池中提交
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500); //七秒钟之内是否完全运行完毕了
        executorService.shutdownNow();//暴力关闭线程
        //这是正在队列中的数据 取了个List集合 都到runnableList里面了 要给它一个交代
        List<Runnable> runnableList = executorService.shutdownNow();
    }
}
class ShutDownTask implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+"被中断了");
        }
    }
}

=================================================================
pool-1-thread-6
pool-1-thread-4
pool-1-thread-10
pool-1-thread-8
pool-1-thread-3被中断了
pool-1-thread-1被中断了
pool-1-thread-8被中断了
pool-1-thread-2被中断了

暂停和恢复线程池

  • 拒绝时机
    • 当Executor关闭时,提交新任务被拒绝
    • 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和
4种拒绝策略
  • AbortPolicy [直接抛出异常]
  • DiscardPolicy [默默的丢弃]
  • DiscardOldestPolicy [丢弃最老的]
  • CallerRunsPolicy [誰提交任务誰去跑(避免了业务损失 提交任务速度下降 给了线程池缓冲时间)]
钩子方法,给线程池加点料
  • 每个任务执行前后
  • 日志、统计
  • 代码演示
threadpool/PauseableThreadPool.java
package threadpool;

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示每个任务执行前后放钩子函数
 */
public class PauseableThreadPool extends ThreadPoolExecutor {
    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();
    private boolean isPaused;
    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override //在执行任务之前都会调用这个函数
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        //如果检测到就暂停休息
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void pause(){
        lock.lock();
        try{
            isPaused = true;
        }finally {
            lock.unlock();
        }
    }
    public void resume(){
        lock.lock();
        try{
            isPaused = false;
            unpaused.signalAll(); //唤醒全部
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
            Thread.sleep(1500);
            pauseableThreadPool.pause();
            System.out.println("线程池被暂停了");
            Thread.sleep(1500);
            pauseableThreadPool.resume();
            System.out.println("线程池被恢复了");
    }
}

线程池实现复用的原因

  • 线程池组成部分
    • 线程池管理器
    • 工作线程
    • 任务队列
    • 任务接口(Task)
Executor家族
  • 线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等这么多和线程池相关的类,都是什么关系?

    Executor ← ExecutorService ← AbstractExecutorService ← ThreadPoolExecutor

线程池实现任务复用的原理
  • 相同线程执行不同任务

线程池状态和使用注意点

线程池状态
状态 含义
RUNNING 接收新任务并处理排队任务
SHUTDOWN 不接受新任务,但处理排队任务
STOP 不接受新任务,也不处理排队任务,并中断正在进行的任务
TIDYING 所有任务都已终止,workerCount为零时,线程会转换到TIDYING状态,并将运行terminate()钩子方法
TERMINATED terminate()运行完成
使用线程池的注意点
  • 避免任务堆积
  • 避免线程数过度增加
  • 排查线程泄露

ThreadLocal

ThreadLocal的两个作用
  • 让某个需要用到的对象在线程间隔离
  • 在任何方法中都可以轻松获取到该对象 .get

ThreadLocal类用来提供线程内部的局部变量。这种变量在多线程环境下访问(通过get和set方法访问) 时能保证各个线程的变量相对独立于其他线程内的变量。ThreadLocal实例通常来说都是ptivate static类型的,用于关联线程和线程上下文,它的作用是:提供线程内的局部变量,不同的线程之间不会相互干扰,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或组件之间一些公共变量传递的复杂性。

  • 定义:提供线程局部变量;一个线程局部变量在多个线程中,分别由独立的值(副本)
  • 特点:简单(开箱即用)、快速(无格外开销)、安全(线程安全)
  • 场景:多线程场景(资源持有、线程一致性、并发计算、线程安全等场景)
  • 实现原理:Java中使用哈希表实现
  • 应用范围:几乎所有提供多线程特征的语言
  • 设计者追求开箱即用的体验

ThreadLocal API

  • 构造函数 ThreadLocal()
  • 初始化 initialValue()
    • 该方法会返回当前线程对应的”初始值”,这是一个延迟加载的方法,只有在调用get的时候,才会触发
    • 当线程第一次使用get方法访问变量时,将调用此方法
    • 每个线程最多调用一次此方法,但如果已经调用了remove()后,在调用get(),则可以再次调用此方法
    • 如果不重写本方法,这个方法会返回null。一般使用匿名内部类的方法来**重写initialValue()**方法
  • 访问器 T get():得到这个线程对应的value,如果是首次调用get()则会调用initialize来得到这个值 // void set(T t):为这个线程设置一个新值
  • 回收 void remove(): 删除这个线程
根据共享对象的生成时机不同,选择initialValue或set来保存对象
场景一:initialValue
  • 在ThreadLocal第一次get的时候把对象給初始化出来,对象的初始化时机可以由我们控制
场景二:set
  • 如果需要保存到ThreadLocal里的对象的生成时机不由我们随机控制,例如拦截器生成的用户信息
  • 用ThreadLocal.set直接放到我们的ThreadLocal中去,以便后续使用
使用ThreadLocal带来的好处
  • 达到线程安全
  • 不需要加锁,提高执行效率
  • 更高效地利用内存、节省开销
  • 避免传参的繁琐
threadpool/ThreadLocalAPI.java
package threadpool;

public class ThreadLocalAPI {
    public static ThreadLocal<Long> x = new ThreadLocal(){
        @Override
        protected Long initialValue(){
            System.out.println("Inital Value run...");
            return Thread.currentThread().getId();
        }
    };

    public static void main(String[] args) {
        new Thread() {
            @Override
            public void run() {
                System.out.println(x.get());
            }
        }.start(); //调用一次initialValue 每个线程单独拥有一个
        x.set(101l);
        x.remove();//清空线程 结果为1 因为 Thread.currentThread().getId();
        System.out.println(x.get());//发现x.remove被移除了 结果去重新触发initialValue
        System.out.println(x.get());
    }
}
========================================================================
Inital Value run...
Inital Value run...
20
1
1

若注释掉x.remove则会出现结果:
Inital Value run...
101
101
20

ThreadLocal的4种核心场景

  • 持有资源——持有线程资源供线程的各个部分使用,全局获取,减少编程难度
  • 线程一致——帮助需要保持线程一致的资源(如数据库事务) 维护一致性,降低编程难度
  • 线程安全——帮助只考虑了单线程的程序库,无缝向多线程场景迁移

ThreadLocal并发场景分析01

例1 200QPS压测统计接口
  • 观察200QPS下Spring框架的执行情况
  • 目标:理解并发、竞争条件、临界区等概念
  • 代表场景:交易场景
并发、竞争条件和临界区

ThreadLocal并发场景

  • 并发:多个程序同时执行
  • 竞争条件:多个进程(线程)同时访问同一个内存资源,最终的执行结果依赖于多个进程执行时的精确时序
  • 临界区:访问共享内存的程序片段
StatController.java
package com.imooc.springbootlearn;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller //并发可能导致同时 数据错误 需要加★后面的代码
public class StatController {
    //★★ 避免用锁 导致当数据多的话线程缓慢
    static ThreadLocal<Integer> c = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 0;
        }
    };
    //★ 请求并发也要排队synchronized(但是不要轻易使用锁) 应该怎么办 见★★
    synchronized void _add() throws InterruptedException {
        Thread.sleep(100);
        c.set(c.get() + 1);
    }

    @RequestMapping("/stat")
    public Integer stat() {
        return c.get();
    }

    @RequestMapping("/add")
    public Integer add() throws InterruptedException {
//        Thread.sleep(100l);
//        c++;
        _add();
        return 1;
    }

}
SpringBootlearnApplication.java
package com.imooc.springbootlearn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication(scanBasePackages="com.imooc")
public class SpringBootlearnApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootlearnApplication.class, args);
    }
}
总结
  • 基于线程池模型synchronize(排队操作很危险)
  • 用ThreadLocal收集数据很快速且安全
  • 思考:如何在多个ThreadLocal中收集数据?

ThreadLocal场景分析——减少同步

ThreadLocal< T >同步
StatController.java    高效解决高并发
package com.imooc.springbootlearn;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.HashSet;

@Controller //并发可能导致同时 数据错误 需要加★后面的代码
public class StatController {

    static HashSet<Val<Integer>> set = new HashSet<>();
    synchronized static void addSet(Val<Integer> v){
        set.add(v);
    }

    //★★ 避免用锁 导致当数据多的话线程缓慢
    static ThreadLocal<Val<Integer>> c = new ThreadLocal<Val<Integer>>() {
        @Override
        protected Val<Integer> initialValue() {
            Val<Integer> v = new Val<>();
            v.set(0);
            //set.add(v);//容易产生同步问题 小锁一下
            return v;
        }
    };

    //★ 请求并发也要排队synchronized(但是不要轻易使用锁) 应该怎么办 见★★
/*synchronized*/void _add() throws InterruptedException {
        Thread.sleep(100); //高并发
        Val<Integer> v = c.get();
        v.set(v.get() + 1);
    }

    @RequestMapping("/stat")
    public Integer stat() {
        return set.stream().map(x -> x.get()).reduce((a, x) -> a + x).get();
    }

    @RequestMapping("/add")
    public Integer add() throws InterruptedException {
//        Thread.sleep(100l);
//        c++;
        _add();
        return 1;
    }
}
Val.java
package com.imooc.springbootlearn;

public class Val<T>{
    T v;
    public void set(T _v){
        v = _v;
    }
    public T get(){
        return v;
    }
}
总结
  • 完全避免同步()
  • 缩小同步范围(简单) + ThreadLocal解决问题
  • 思考:还可以用在哪些场景?
源码分析1-Quartz SimpleSemaphore

  • Quartz的SimpleSemaphore提供资源隔离(上锁)
  • SimpleSemaphore中的lockOwners(ThreadLocal)为重度锁操作前置过滤
  • 思考:学易,用难!
源码分析2-Mybatis框架保持连接池线程一致
什么是本地事务
  • A(Atomic)原子性,操作不可分割
  • C(Consistency)一致性,任何时刻数据都能保持一致
  • I(Isolation)隔离性,多事务并发执行的时许不影响结果
  • D(Durability)持久性,对数据结构的存储是永久的

Begin → [本地事务:(更新订单状态 →<订单状态更新成功>→发放资源) → 资源发放成功(提交) → Commit →<持久化>→DB。**若资源发放失败(回滚)**→Rollback]

源码分析03 Spring框架对分布式事务的支持

技术选型——实现自己的ThreadLocal

MyThreadLocal.java
package com.imooc.springbootlearn;

import java.util.HashMap;
import java.util.Objects;

class MyThreadLocal<T> {
    static HashMap<Thread, HashMap<MyThreadLocal<?>, Objects>> threadLocalMap = new HashMap<>();//这里会产生临界区

    synchronized static HashMap<MyThreadLocal<?>, Objects> getMap() { //这是锁临界区
        var thread = Thread.currentThread();
        if (!threadLocalMap.containsKey(thread)) {
            threadLocalMap.put(thread, new HashMap<MyThreadLocal<?>, Objects>());
        }
        return threadLocalMap.get(thread);
    }

    T value;

    protected T initialValue() {
        return null;
    }

    public T get() {
        var map = getMap();
        if (!map.containsKey(this)) {
            map.put(this, initialValue());
        }
        return (T) map.get(this);
    }

    public void set(T v){
        var map = getMap();
        map.put(this, v);
    }
}
Test.java
package com.imooc.springbootlearn;

public class Test {
    static MyThreadLocal<Long> v = new MyThreadLocal<Long>(){
        @Override
        protected Long initialValue() {
         return Thread.currentThread().getId();
        }
    };

    public static void main(String[] args) {
        for (Integer i = 0; i < 100; i++) {
            new Thread(()->{
                System.out.println(v.get());
            }).start();
        }
    }
}
问题
  • HashMap中直接存储了MyThreadLocal的引用,导致内存无法回收
  • 思考:可以用整数ID替代对MyThreadLocal的引用
MyThreadLocal.java
package com.imooc.springbootlearn;

import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

class MyThreadLocal<T> {
    static AtomicInteger atomic = new AtomicInteger(); //保证数据唯一性
    Integer threadLocalHash = atomic.addAndGet(0x61c88647); //这个值保证hash的更平均性
    static HashMap<Thread, HashMap<Integer, Objects>> threadLocalMap = new HashMap<>();//这里会产生临界区

    synchronized static HashMap<Integer, Objects> getMap() { //这是锁临界区
        var thread = Thread.currentThread();
        if (!threadLocalMap.containsKey(thread)) {
            threadLocalMap.put(thread, new HashMap<Integer, Objects>());
        }
        return threadLocalMap.get(thread);
    }

    T value;

    protected T initialValue() {
        return null;
    }

    public T get() {
        var map = getMap();
        if (!map.containsKey(this.threadLocalHash)) {
            map.put(this.threadLocalHash, initialValue());
        }
        return (T) map.get(this.threadLocalHash);
    }

    public void set(T v){
        var map = getMap();
        map.put(this.threadLocalHash, v);
    }
}
  • HashMap无限增加?初始空间分配是否合理?

HashTable

源码解读——哈希表实现ThreadLocal
什么是哈希表?

哈希表(散列 HashTable) 根据键(Key) 访问/设置内存中存储的位置的值

总结

  • 架构是严密且精确的东西(切记夸夸其谈)
  • 并发是一个很危险的场景,提高能力才会获得安全感
  • 保持怀疑,持续学习
  • 会用 → 场景查找 → 轻量实现 → 源码对照 → 场景沉淀
  • 程序架构:低耦合(独立)、高内聚(组合做到开箱即用)
  • 无论达到什么高度,要永远认为自己是个菜鸡

分布式章节

  • 什么是分布式
  • 分布式的作用
  • 分布式和单体结构的对比
  • CAP定理

  • 集群、分布式、微服务的区别

什么是分布式

  • 利用物理架构形成多个自治的处理元素,不共享主内存,但是通过发送信息合作。

  • 饭店初始的例子
    • 一个厨师 [单例]
    • 多个厨师
    • 术业有专攻:配菜师、洗菜工
  • 实际项目的演进过程

    • 一个项目,大而全
    • 多台机器,部署相同的应用
    • 分布式:权限系统、员工系统、请假系统

分布式的作用

为什么需要分布式
  • 实际工作中的痛点
    • 工程臃肿 [相互耦合 相互冲突]
    • 测试、上线繁琐
    • 开发效率低
单体应用的问题
  • 应用代码耦合严重,功能扩展难
  • 新需求开发交互周期长,测试工作量大
  • 新加入的开发同事需要很长时间才能熟悉系统
  • 升级维护也很困难(改动任何一点地方都要升级整个系统)
  • 系统性能提升艰难,可用性低,不稳定
分布式的好处
  • 增大系统容量
  • 加强系统可用[某个模块出bug 但不影响其他]
  • 因为模块化,所以系统模块重用度更高
  • 因为软件服务模块被拆分,开发和发布速度可以并行而变得更快
  • 系统扩展性更高
  • 团队协作流程也会得到改善
  • 技术升级

单体和分布式的对比

分布式和单体结构的对比
传统单体构架 分布式构架
新人的学习成本 业务逻辑成本高 架构逻辑成本高
部署、运维 容易 发布频繁,顺序复杂、运维难
隔离性 一损俱损,殃及鱼池 故障影响范围小
架构设计 难度低 难度指数级上升
系统性能 响应快、吞吐量小 响应慢、吞吐量大
测试成本 很高
技术多样性 技术单一且封闭 技术多样且开放
系统扩展性 扩展性差 扩展性很好
系统管理成本 成本低 成本高

CAP定理

CAP的重要性

分布式不可能同时满足三个条件

CAP理论是什么?
  • C(Consistency, 一致性):读操作是否总能读到前一个写操作的结果
  • A(Availability, 可用性):非故障节点应该在合理的时间内作出合理的响应(不是错误或超时的响应),但是可能不是最新的数据
  • P(Partition tolerance, 分区容错):当出现网络分区现象后,系统能够继续运行

CAP如何选择?
  • CP[支付宝]或者AP[超级跑跑系统维护]
  • 在什么场合,可用性高于一致性?
    • 网页必须要保障可用性(一定能看到最重要 是不是最新的不重要)和分区容错
    • 支付的时候一定要保障一致性(我可以保证不可用 但我不允许余额不一致)和分区容错
  • 合适的才是最好的

集群、分布式、微服务的区别

集群和分布式的区别
  • 分布式:一个业务分拆多个子业务,部署在不同的服务器上 [服务器之间要通信]
  • 集群:同一个业务,部署在多个服务器上 [五台机器可以不通信]
集群和微服务的区别
  • 集群:分散压力
  • 微服务:分散压力
微服务和分布式的区别
  • 微服务是架构设计方式 [逻辑架构]

  • 分布式是系统部署方式 [物理架构]

  • 微服务:是一种架构方式 [大的服务拆成小的服务 每个服务独立开发测试]

  • 分布式:主要强调部署的方式

Docker

  • 基本概念、用途、核心思想
  • Docker的组成、架构、重要概念
  • Docker的安装
  • 第一个Docker容器
  • 运用Nginx镜像,并访问到Docker容器内部 [从外部进行访问]
  • 制作自己的Docker容器,dockerfile实战

Docker的基本概念、用途、核心思想

  • Docker应用广泛
    京东618:15万个Docker实例,所有业务全部容器化
  • Docker是什么?
    [以前的图标是大鲸鱼拖着集装箱]
    • Docker是一个用来装程序及其环境的容器[类似于安装包.exe],属于Linux容器的一种封装,提供简单易用的容器使用接口。它是目前最流行的Linux容器解决方案
    • 比喻:客车可以装人,衣柜可以放衣服 [Windows下写的小游戏也可以放在Docker中]

为什么需要Docker

  • 环境配置的难题
  • 虚拟机[资源占用很多 模拟一套完整系统但步骤多(有些步骤无法跳过) 启动慢]
  • Docker的基础——Linux容器 [体积小 速度快 轻量级虚拟机]

Docker的用途

  • 提供统一的环境
  • 提供快速拓展、弹性伸缩的云服务 [解决双十一淘宝+天猫 数据量剧增问题]
  • 防止其他用户的进程把服务器资源占用过多 [程序相互之间可以隔离]
  • 部署简单 运维简单 节省服务器资源

Docker的特点

标准化
  • 运输方式(把程序和环境从一个机器运送到另一个机器)

  • 存储方式(程序和环境的存储)

  • API接口(不需要Tomcat等应用的命令了,都标准化了)

  • 灵活:即使是最复杂的应用也可以集装箱化

  • 轻量级:容器利用并共享主机内核

  • 便携式:可以在本地构建,部署到云,并在任何地方运行

Docker带来的好处
  • 开发团队的好处 完全可以控制所有的环境[同一个镜像] 降低了风险

image镜像

  • 存储:联合文件系统,UnionFS

容器和仓库

  • 镜像类似于Java中的类,而容器就是实例化
  • 容器的这一层是可以修改的,而镜像是不可以修改的
  • 同一个镜像可以生成多个容器独立运行,而她们之间没有任何的干扰
仓库
client和deamon
  • client[客户端]:提供給用户一个终端,用户输入Docker提供的命令来管理本地或远程的服务器
  • deamon服务端守护进程,接收Client发送的命令并执行相应的操作

Docker的安装

更换系统镜像

云服务器管理控制台 (aliyun.com) → 实例 → 更多 → 更换操作系统 → CentOS 7.6 64位

C:\Users\Pluminary>ssh root@47.98.225.105
    
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@    WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!     @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!
Someone could be eavesdropping on you right now (man-in-the-middle attack)!
It is also possible that a host key has just been changed.
The fingerprint for the ED25519 key sent by the remote host is
SHA256:THXnakQ0Se5ee+d7oHO0NYKBTW7mhEKc426m9+rQgnk.
Please contact your system administrator.
Add correct host key in C:\\Users\\Pluminary/.ssh/known_hosts to get rid of this message.
Offending ECDSA key in C:\\Users\\Pluminary/.ssh/known_hosts:7
Host key for 47.98.225.105 has changed and you have requested strict checking.
Host key verification failed.
    
//使用命令或手动删除 C:\\Users\\Pluminary/.ssh/known_hosts 防火墙拦截了
vim C:\\Users\\Pluminary/.ssh/known_hosts

C:\Users\Pluminary>ssh root@47.98.225.105
root@47.98.225.105's password:Panchunyao123!

[root@iZbp1dssknxftmjczbtpndZ ~]# cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
//yum地址换成国内yum源
   //wget-O 这是大写的字母O 含义是放到指定目录下 替换本地文件为国内文件
[root@iZbp1dssknxftmjczbtpndZ ~]# wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
[root@iZbp1dssknxftmjczbtpndZ ~]# yum clean all
[root@iZbp1dssknxftmjczbtpndZ ~]# yum makecache //读取新的源

//较旧的Docker版本称为docker或docker-engine。如果已安装这些程序,请卸载它们以及相关的依赖项。
[root@iZbp1dssknxftmjczbtpndZ ~]# yum remove docker \
                  docker-client \
                  docker-client-latest \
                  docker-common \
                  docker-latest \
                  docker-latest-logrotate \
                  docker-logrotate \
                  docker-engine
//如果yum报告未安装这些软件包,也没问题。
    //更新yum 先查看需要更新哪些
[root@iZbp1dssknxftmjczbtpndZ ~]# yum check-update
[root@iZbp1dssknxftmjczbtpndZ ~]# yum update
// 安装所需的软件包
[root@iZbp1dssknxftmjczbtpndZ ~]# yum install -y yum-utils \
                  device-mapper-persistent-data \
                  lvm2
// 使用以下命令来设置稳定的存储库 sudo是用超级管理员
[root@iZbp1dssknxftmjczbtpndZ ~]# sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

// 查看docker版本
[root@iZbp1dssknxftmjczbtpndZ ~]# yum list docker-ce --showduplicates | sort -r
// 安装指定的版本
[root@iZbp1dssknxftmjczbtpndZ ~]# yum install docker-ce-18.09.0 docker-ce-cli-18.09.0 containerd.io
// Docker 是服务器----客户端架构
// 命令行运行docker命令的时候,需要本机有 Docker 服务。用下面的命令启动
[root@iZbp1dssknxftmjczbtpndZ ~]# systemctl start docker
// 安装完成后,运行下面的命令,验证是否安装成功。
[root@iZbp1dssknxftmjczbtpndZ ~]# docker version 或者 docker info

第一个Docker容器

  • 下载镜像
  • docker pull [OPTIONS] NAME[:TAG]
  • docker images [OPTIONS] [REPOSITORY[:TAG]] 查看本机有没有任何镜像
[root@iZbp1dssknxftmjczbtpndZ ~]# docker pull hello-world
Using default tag: latest
latest: Pulling from library/hello-world
c1ec31eb5944: Pull complete
Digest: sha256:53641cd209a4fecfc68e21a99871ce8c6920b2e7502df0a20671c6fccc73a7c6
Status: Downloaded newer image for hello-world:latest
[root@iZbp1dssknxftmjczbtpndZ ~]# docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
hello-world         latest              d2c94e258dcb        11 months ago       13.3kB
  • 运行镜像
  • docker run [OPTIONS] IMAGE [COMMAND] [ARG…]
[root@iZbp1dssknxftmjczbtpndZ ~]# docker run hello-world
Hello from Docker!
This message shows that your installation appears to be working correctly.

To generate this message, Docker took the following steps:
 1. The Docker client contacted the Docker daemon.
 2. The Docker daemon pulled the "hello-world" image from the Docker Hub.
    (amd64)
 3. The Docker daemon created a new container from that image which runs the
    executable that produces the output you are currently reading.
 4. The Docker daemon streamed that output to the Docker client, which sent it
    to your terminal. 

To try something more ambitious, you can run an Ubuntu container with:
 $ docker run -it ubuntu bash

Share images, automate workflows, and more with a free Docker ID:
 https://hub.docker.com/

For more examples and ideas, visit:
 https://docs.docker.com/get-started/

后台运行容器

运行Nginx镜像,并访问到Docker容器内部
  • 前台、后台
[root@iZbp1dssknxftmjczbtpndZ ~]# docker images //查看一下程序镜像是否还在
[root@iZbp1dssknxftmjczbtpndZ ~]# docker pull hub.c.163.com/library/nginx:1.13.0
[root@iZbp1dssknxftmjczbtpndZ ~]# docker images
REPOSITORY                    TAG                 IMAGE ID            CREATED             SIZE
hello-world                   latest              d2c94e258dcb        11 months ago       13.3kB
hub.c.163.com/library/nginx   1.13.0              46102226f2fd        6 years ago         109MB
[root@iZbp1dssknxftmjczbtpndZ ~]# docker run hub.c.163.com/library/nginx:1.13.0
//上面的没反应是正常情况,再开一个cmd窗口


//第二个窗口:另一个cmd窗口 
C:\Users\Pluminary>ssh root@47.98.225.105
root@47.98.225.105's password:
Last login: Sun Apr  7 16:30:40 2024 from 182.102.75.173

Welcome to Alibaba Cloud Elastic Compute Service !
//展示当前容器列表
[root@iZbp1dssknxftmjczbtpndZ ~]# docker ps
CONTAINER ID        IMAGE                                COMMAND                  CREATED              STATUS              PORTS               NAMES
298f7e19f6a8        hub.c.163.com/library/nginx:1.13.0   "nginx -g 'daemon of…"   About a minute ago   Up About a minute   80/tcp              kind_taussig
//这时回到第一个窗口 ctrl+c 关掉运行 这时去第二个窗口docker ps的时候已经没有了
// ↓↓↓↓↓↓ 要把Nginx在后台运行才可以解决上述问题 ↓↓↓↓↓↓
[root@iZbp1dssknxftmjczbtpndZ ~]# docekr run -d hub.c.163.com/library/nginx:1.13.0
f124fc9171824e508639768b800efba5c780385dc1d135ab0ff70d80d3d75510 //返回容器ID
//此时再去第二个窗口 docker ps就会看到 CONTAINER ID 有f124fc917182
//第一个窗口 查看容器内部的风景 -i让容器输入有效 t是給我们分配一个终端 bash是启动终端
[root@iZbp1dssknxftmjczbtpndZ ~]# docker exec -it f124 bash //让docker明白我们让哪个启动
root@f124fc917182:/#  //已经进入到容器内部
root@f124fc917182:/# pwd
/
root@f124fc917182:/# touch 1 //创建了一个1
root@f124fc917182:/# ls
1  bin  boot  dev  etc  home  lib  lib32  lib64  libx32  media  mnt  opt  proc  root  run  sbin  srv  sys  tmp  usr  var
root@f124fc917182:/# which nginx
/usr/sbin/nginx

Docker三种网络模式 [网络会隔离 需要配置进行端口映射]

  • Bridge 网卡网络独立的端口
  • Host 同步宿主机的端口
  • None
  • 端口映射技术
实操
  • 访问Docker内的Nginx
cmd窗口2
[root@iZbp1dssknxftmjczbtpndZ ~]# docker stop f12
f12
[root@iZbp1dssknxftmjczbtpndZ ~]# docker ps //目前没有容器在运行了
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
    //-p 本机的8080端口去映射内部的80端口 前面的是本机 后面的是容器内部的
[root@iZbp1dssknxftmjczbtpndZ ~]# docker run -d -p 8080:80 hub.c.163.com/library/nginx:1.13.0
3906129ced72b668581a58dc36595b08734b82e247e2927eb2a8da6fbe7508ae
//拔出端口的信息
[root@iZbp1dssknxftmjczbtpndZ ~]# netstat -na|grep 8080
tcp6       0      0 :::8080                 :::*                    LISTEN

在网页上输入 http://47.98.225.105:8080/

Welcome to nginx!
If you see this page, the nginx web server is successfully installed and working. Further configuration is required.
For online documentation and support please refer to nginx.org.
Commercial support is available at nginx.com.
Thank you for using nginx.

这个不是在宿主机启动的nginx 而是在容器中启动的【通过宿主机中转 并通过Bridge实现独立】
★★ 通过一台服务器上去布置多台Docker 而每台docker里面是独立的 并且可以通过一个端口 来实现和外界的映射关系 ★★
//这个与7行代码的区别 这里是大P 将这个容器的所有端口都进行映射
[root@iZbp1dssknxftmjczbtpndZ ~]# docker run -d -P hub.c.163.com/library/nginx:1.13.0

制作自己的Docker容器,dockerfile实战

[把自己的软件程序打包好传給别人]

  • dockerfile作用
  • 使用dockerfile的好处[一目了然 哪个环境跑在什么环境下 非常方便知道如何配置]
  • 写一个自己的dockerfile

因此我们只需要在dockerfile中指定需要哪些程序、依赖什么样的配置,之后把dockerfile交给“编译器”docker进行“编译”,也就是docker build命令,生成的可执行程序就是image,之后就可以运行这个image了,这就是docker run命令,image运行起来后就是docker container。

FROM alpine:latest #继承父类 alpine及其小的环境
MAINTAINER imooc   #描述这个镜像由誰维护的
CMD echo 'hello my dockerfile'  #进入任何想要的dockerfile命令
[root@iZbp1dssknxftmjczbtpndZ ~]# touch Dockerfile
[root@iZbp1dssknxftmjczbtpndZ ~]# ls
Dockerfile
[root@iZbp1dssknxftmjczbtpndZ ~]# vim Dockerfile
//按小写i进入编辑模式  按ESC退出编辑模式 :wq会保存
//取名字 .dockerfile现在的路径
[root@iZbp1dssknxftmjczbtpndZ ~]# docker build -t hello_docker .
    //先发送到服务器
Sending build context to Docker daemon  285.7kB
    //从父类中拉取下来
Step 1/3 : FROM alpine:latest
latest: Pulling from library/alpine
4abcf2066143: Pull complete
Digest: sha256:c5b1261d6d3e43071626931fc004f70149baeba2c8ec672bd4f27761f8e1ad6b
Status: Downloaded newer image for alpine:latest
 ---> 05455a08881e
    //写定它的维护者
Step 2/3 : MAINTAINER imooc
 ---> Running in 38a73abca50e
Removing intermediate container 38a73abca50e
---> f44831f49afe
    //把这个语句写进去
Step 3/3 : CMD echo 'hello my dockerfile'
 ---> Running in 1ff431539208
Removing intermediate container 1ff431539208
 ---> f45b88d0cabc
Successfully built f45b88d0cabc
Successfully tagged hello_docker:latest

//运行自己的镜像
[root@iZbp1dssknxftmjczbtpndZ ~]# docker images
REPOSITORY                    TAG                 IMAGE ID            CREATED             SIZE
hello_docker                  latest              f45b88d0cabc        3 minutes ago       7.38MB
alpine                        latest              05455a08881e        2 months ago        7.38MB
hello-world                   latest              d2c94e258dcb        11 months ago       13.3kB
hub.c.163.com/library/nginx   1.13.0              46102226f2fd        6 years ago         109MB
[root@iZbp1dssknxftmjczbtpndZ ~]# docker run hello_docker
hello my dockerfile

Nginx

  • Nginx介绍
  • Nginx的安装
  • 常用命令讲解和演示
  • 配置文件讲解
  • 场景实战:搭建一个静态文件的Nginx服务

Nginx介绍

  • Nginx是什么、适用场景
  • Nginx应用广泛
  • Nginx优点

Nginx是什么、适用场景

  • HTTP的反向代理服务器
  • 动态静态资源分离
正向代理

普通的请求转发,客户端把信息传递到代理服务器,代理服务器找到信息转发给我们
提供安全功能 代理服务器有防火墙,可以隐藏自身的信息

反向代理

提供安全和防火墙 在多个服务器后端提供负载均衡 为服务器提供缓存 把来自用户的压力平均分配[负载均衡]

动态静态资源分离 [加速访问 整体速度提高]
  • 不分离会变慢
  • 静态资源无需经过Tomcat,Tomcat只负责处理动态请求
  • 后缀为gif的时候,Nginx会直接获取到当前请求的文件并返回

Nginx应用广泛

Nginx的优点
  • 高并发、高性能
  • 可扩展性好
  • 高可靠性 [服务器运行可以达到数年之久]
  • 热部署
  • 开源、可商用

Nginx的安装

Nginx在CentOs安装
[root@iZbp1dssknxftmjczbtpndZ ~]# yum install yum-utils
[root@iZbp1dssknxftmjczbtpndZ ~]# vim /etc/yum.repos.d/nginx.repo  //输入源信息 告诉它从哪下载
//输入:3-18行 配置源
[root@iZbp1dssknxftmjczbtpndZ ~]# [nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/7/$basearch/
gpgcheck=1
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true

[nginx-mainline]
name=nginx mainline repo
baseurl=http://nginx.org/packages/mainline/centos/7/$basearch/
gpgcheck=1
enabled=0
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
    //然后查看源 列出可以使用的ngix地址
[root@iZbp1dssknxftmjczbtpndZ ~]# yum list | grep nginx
    //运行安装命令
[root@iZbp1dssknxftmjczbtpndZ ~]# yum install nginx 1:1.16.1-1.el7.ngx
    //查看版本,若出现版本号,则安装成功
[root@iZbp1dssknxftmjczbtpndZ ~]# nginx -v
nginx version: nginx/1.24.0
用whereis nginx可以查看到目录:
[root@iZbp1dssknxftmjczbtpndZ ~]# nginx: /usr/sbin/nginx /usr/lib64/nginx /etc/nginx /usr/share/nginx /usr/share/man/man8/nginx.8.gz

常用命令讲解和演示

[root@iZbp1dssknxftmjczbtpndZ ~]# /usr/sbin/ngix 启动
[root@iZbp1dssknxftmjczbtpndZ ~]# systemctl stop docker //把之前的docker关停
[root@iZbp1dssknxftmjczbtpndZ ~]# ps -aux | grep nginx //提取相关进程 80是默认端口
[root@iZbp1dssknxftmjczbtpndZ ~]# nginx -h //提取帮助
    
[root@iZbp1dssknxftmjczbtpndZ ~]# cd /etc/nginx
[root@iZbp1dssknxftmjczbtpndZ nginx]# ls //里面有nginx.conf配置文件 conf.d中也有一个defalut.conf
conf.d  fastcgi_params  mime.types  modules  nginx.conf  scgi_params  uwsgi_params
[root@iZbp1dssknxftmjczbtpndZ nginx]# cd conf.d/
    
[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -c   //读取指定配置文件
[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -s stop //停止nginx
[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -c /etc/nginx/nginx.conf //以这个文件启动nginx

[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -t //发布前的测试
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful

[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -s stop
[root@iZbp1dssknxftmjczbtpndZ conf.d]# /usr/sbin/nginx //开启nginx
[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -t //查看使用的哪个配置文件
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful

[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -v //打印版本信息
[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -V //打印详细参数
    
[root@iZbp1dssknxftmjczbtpndZ conf.d]# whereis uginx//找nginx的路径
nginx: /usr/sbin/nginx /usr/lib64/nginx /etc/nginx /usr/share/nginx /usr/share/man/man8/nginx.8.gz
[root@iZbp1dssknxftmjczbtpndZ conf.d]# ps aux | grep nginx //打印进程信息

    //-s信号 1.stop立即停止 2.quit优雅停止 3.reload重启[优雅停止quit] 4.reopen更换日志文件

配置文件讲解

  • 语法
    • “; “ 结尾
    • “{}” 组织多条指令
    • “include” 引入
    • ‘’#’’ 注释
  • 实操演示
  • 默认配置文件分析
    • nginx.conf
    • default.conf
[root@iZbp1dssknxftmjczbtpndZ conf.d]# cat /etc/nginx/nginx.conf //打开文件配置
nginx.conf配置文件讲解
首先我们进入到cd etc/nginx.然后通过ls查看nginx目录的相关内容。在nginx目录下,我们需要关注nginx.conf文件,这个文件是我们的主配置文件,cat打开:
cat nginx.conf 【效果显示在下方分割线后】

[root@iZbp1dssknxftmjczbtpndZ conf.d]# cd /usr/share/nginx/html
[root@iZbp1dssknxftmjczbtpndZ html]# vim stst.html //新建文件
[root@iZbp1dssknxftmjczbtpndZ html]# ls
50x.html     index.html
[root@iZbp1dssknxftmjczbtpndZ html# vim test.html //新建一个html文件保存下来
//i编辑       hello nginx           esc + :wq
[root@iZbp1dssknxftmjczbtpndZ conf.d]# nginx -c /etc/nginx/nginx.conf //启动!
//如果不显示一定是服务器规则组的问题  自定义 TCP    目的:1/65535   源:0.0.0.0/0
网址输入:http://47.98.225.105/test.html 
============================================================================
# 运行用户,默认是nginx
user  nginx;
# nginx进程数,一般设置为和cpu核数一样
worker_processes  1;

# 全局错误日志路径
error_log  /var/log/nginx/error.log warn;
# 进程pid路径
pid        /var/run/nginx.pid;
 
events {
# 最大连接数
    worker_connections  1024;
}

# 设置http服务器
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
# 设置日志的格式
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
# 访问日志的路径
    access_log  /var/log/nginx/access.log  main;

# 开启高效传输模式
    sendfile        on;
    #tcp_nopush     on;
# 长连接超时时间,单位是秒
    keepalive_timeout  65;
#传输时是否压缩,压缩的话需要解压,但是传的大小就小了
    #gzip  on;
#加载其他的配置文件,一带多
    include /etc/nginx/conf.d/*.conf;
}
============================================================================

搭建静态资源服务器

搭建一个静态文件的Nginx服务
  • 实操演示
  • 配置和网页文件作为教辅提供 [静态网页]
//scp传输文件 -r每一个文件夹都传递归 /usr/share/nginx/web/ 上传到这个文件夹
cmd本机框
C:\Users\Pluminary> scp -r /Users/Pluminary/Desktop/静态资源/静态网页/ root@47.98.225.105:/usr/share/nginx/web/

cmd服务器框
[root@iZbp1dssknxftmjczbtpndZ ~]# cd /usr/share/nginx/web/
[root@iZbp1dssknxftmjczbtpndZ web]# pwd //查看现在位置
/usr/share/nginx/web
[root@iZbp1dssknxftmjczbtpndZ web]# ls
css  fonts  images  index.html  js
//修改文件 可以让nginx访问到文件 访问web下面的内容
[root@iZbp1dssknxftmjczbtpndZ web]# vim /etc/nginx/conf.d/default.conf  
//修改location:.../web
===============================================================
location / {
        root   /usr/share/nginx/web;
        index  index.html index.htm;
    }

    #error_page  404              /404.html;

    # redirect server error pages to the static page /50x.html
    #
    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/web;
    }
===============================================================
[root@iZbp1dssknxftmjczbtpndZ web]# nginx -t //测试一下是否有问题
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
[root@iZbp1dssknxftmjczbtpndZ web]# nginx -s reload //重启服务
//浏览器访问:http://47.98.225.105/index.html
[root@iZbp1dssknxftmjczbtpndZ web]# cat /etc/nginx/nginx.conf
//进去看到 access_log  /var/log/nginx/access.log  main; 打开它
[root@iZbp1dssknxftmjczbtpndZ web]# cat /var/log/nginx/access.log //打开日志文件

RabbitMQ

  • 初识RabbitMQ
  • RabbitMQ的安装和启动 [Erlang语言]
  • RabbitMQ管理后台
  • 实战案例演示
  • 交换机工作模式 [fanout、direct、topic、headers]
  • SpringBoot整合RabbitMQ

初识RabbitMQ

  • 核心思想:接收并转发消息。类似于邮局

  • producer信息生产者

  • queue队列

  • consumer会从queue中获取消息

消息队列

  • 什么是消息队列
  • MQ(Message Queue)
消息队列的性质
  • 业务无关 [不需要考虑上层业务模型]
  • FIFO [先进先出]
  • 容灾 [对于消息队列可以持久化 断电后也可以保存数据后重新发送]
  • 性能
为什么要用消息队列
  • 系统解耦
  • 异步调用 [我告诉你要做 我可以不等你做完 我再返回 相互不影响]
    [点外卖 用户发送外卖请求給中间件,随后MQ帮助分发后续流程(扣钱、召唤骑手 计算天气、时间等)]
  • 流量削峰 [MQ把请求先存在队列中 以合适的速度发送 化解压力]

RabbitMQ的特点和核心概念

  • 开源、跨语言
  • Erlang语言编写 [交换机、通信方面] [数据复制与转发性能好]
  • 应用广泛
  • 社区活跃、API丰富
AMQP协议
  • Advanced Message Queuing Protocol 高级消息队列协议
RabbitMQ核心概念
  • Server:服务
  • connection:与Server建立连接
  • channel:信道,几乎所有的操作都在信道上进行,客户端可以建立多个信道
  • message:消息,由properties和body组成
  • virtual host:虚拟主机,顶层隔离。同一个虚拟主机下,不能由重复的交换机和queue
  • Exchange:交换机,接收生产者的信息的,然后根据指定的路由器把消息转发到所绑定的队列上
  • binding:绑定交换机和队列
  • routing key:路由键,路由规则,虚拟机可以用它来确定这个消息如何进行一个路由
  • queue:队列,消费者只需要监听队列来消费消息,不需要关注消息来自于哪个Exchange
  • Exchange和Message Queue存在着绑定的关系,一个Exchange可以绑定多个消息队列

RabbitMQ的安装和启动

  • 安装Erlang
  • 安装RabbitMQ
  • 启动RabbitMQ
安装Erlang
  • 安装erlang-rpm包,该包经过RabbitMQ官方处理
  • 使用Erlang Solutions源进行安装
  • 使用EPEL(“Extra Packages for Enterprise Linux”)进行安装
我们先准备两个安装包://★★★★别看下面胡咧咧 直接教辅中有erlang和rabbitmq与教材相同★★★★
RabbitMQ: https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.13
Erlang:https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm

注意: 如果说你选的是最新的版本那么你就要选取对应支持的 erlang 的版本才行,具体看下面的连接
https://www.rabbitmq.com/which-erlang.html

这些都准备好之后我们使用 Xptf 软件把这个安装包传输到 linux 端对应文件下(自己建一个文件夹)即可。

安装
第一步:
我们第一步先要对 linux 端进行一个 erlang 的环境配置,所以我们要先解压 erlang 包。
    # rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
    # yum install -y erlang
安装完成后,查看一下版本号,能查到说明就是安装好了
    # erl -v

第二步:
在安装一下
    # yum install -y socat

第三步:
接下来我们就需要进行 RabbitMQ 的安装了

    # rpm -Uvh rabbitmq-server-3.8.33-1.el8.noarch.rpm 
    # yum install rabbitmq-server -y

安装完之后我们来启动测试一下:

    # systemctl start rabbitmq-server
    # systemctl status rebbitmq-server

最后一步:
安装跑起来之后我们设置一下开机自启动

    # systemctl enable rabbitmq-server
    # systemctl stop rabbitmq-server   # 关闭开机自起
=========================================================
可以在这些目录中查找RabbitMQ的配置文件、启动脚本等。另外,RabbitMQ的启动脚本通常会被安装到/usr/sbin或/usr/bin目录下。你可以尝试使用以下命令来查找RabbitMQ的启动脚本位置:

find /usr/sbin /usr/bin -name 'rabbitmq*'
=========================================================
[root@iZbp1dssknxftmjczbtpndZ ~]# cd /usr/sbin
[root@iZbp1dssknxftmjczbtpndZ sbin]# systemctl start rabbitmq-server
[root@iZbp1dssknxftmjczbtpndZ sbin]# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
   Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
   Active: active (running) since Mon 2024-04-08 23:24:39 CST; 38min ago
 Main PID: 3570 (beam.smp)
===================================================================================
[root@iZbp1dssknxftmjczbtpndZ sbin]# systemctl enable rabbitmq-server
[root@iZbp1dssknxftmjczbtpndZ sbin]# systemctl stop rabbitmq-server   # 关闭开机自起

Linux环境如何彻底卸载感干净RabbitMQ_linux卸载rabbitmq-CSDN博客

停止RabbitMQ
$rabbitmqctl stop

设置开机启动
$ systemctl enable rabbitmq-server 

启动RabbitMQ
$ systemctl start rabbitmq-server

看看端口有没有起来,查看状态

$ rabbitmqctl status 

要检查RabbitMQ服务器的状态,请运行:

systemctl status rabbitmq-server

开启web管理界面
rabbitmq-plugins enable rabbitmq_management

RabbitMQ管理后台

启动RabbitMQ
  • 启动RabbitMQ:systemctl start rabbitmq-server
  • 查看状态:rabbitmqctl status
  • 启动管理台
  • 配置admin用户
[root@iZbp1dssknxftmjczbtpndZ ~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@iZbp1dssknxftmjczbtpndZ:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@iZbp1dssknxftmjczbtpndZ...
Plugin configuration unchanged. ##添加账户
[root@iZbp1dssknxftmjczbtpndZ ~]# rabbitmqctl add_user admin password
Adding user "admin" ...            ##设置管理员
[root@iZbp1dssknxftmjczbtpndZ ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
浏览器进入RabbitMQ管理后台 输入 http://47.98.225.105:15672/
RabbitMQ管理后台
  • 浏览页面
  • 添加用户
  • 创建虚拟主机(Virtual Hosts) => 都有各自的队列、交换机

第一个生产者

实战案例演示
  • 新建项目
  • Hello World
    P(生产者)→hello→C(消费者)
  • 创建一个rabbitmq(Maven)新项目
  • rabbitmq支持多语言
登录上RabbitMQ后台 上面Admin 右面Virtual Hosts 
点表里的 /  下面增加一个admin

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>20</maven.compiler.source>
        <maven.compiler.target>20</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.29</version>
        </dependency>
    </dependencies>

</project>
helloworld/send.java
package helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Hello World的发送类,连接到RabbitMQ服务端,然后发送一条消息后退出
 */
public class send {
    //队列名字
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列 参数:第二个会不会随着重启消失 第三个队列是否仅能給连接使用 第四个未使用自动删除? 第五个参数
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发布消息
        String message = "Hello World!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + message);
        //关闭连接
        channel.close();
        connection.close();
    }
}
helloworld/Recv.java
package helloworld;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;

/**
 * 接收消息,并打印,持续运行
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列 参数:第二个会不会随着重启消失 第三个队列是否仅能給连接使用 第四个未使用自动删除? 第五个参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("开始接收消息");
        //接收消息并消费 2:是否确认收到(快递签收 自动消息确认) 3:消息收到后进行处理
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            //收到信息后会执行的函数
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        });
    }
}
=======================================================
//由于是持续接收消息的 若将send.java中改为Hello World2 将会持续打印到控制台
开始接收消息
收到消息:Hello World!
收到消息:Hello World2!

根据消息内容做处理

多个消费者分担压力

RabbitMQ后台http://47.98.225.105:15672/

循环调度
workqueues/NewTask.java
package workqueues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:生产批量消息
 */
public class NewTask {
    private final static String TAKS_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列 参数:第二个会不会随着重启消失 第三个队列是否仅能給连接使用 第四个未使用自动删除? 第五个参数
        channel.queueDeclare(TAKS_QUEUE_NAME,true,false,false,null);
        for (int i = 0; i < 10; i++) {
            String message = i + "...";
            channel.basicPublish("",TAKS_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("发送了消息:" + message);
        }
        //关闭连接
        channel.close();
        connection.close();
    }
}
=============================================================================================
发送了消息:0...
发送了消息:1...
发送了消息:2...
发送了消息:3...
发送了消息:4...
发送了消息:5...
发送了消息:6...
发送了消息:7...
发送了消息:8...
发送了消息:9...
workqueues/Worker.java
package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,接收前面的批量消息
 */
public class Worker {
    private final static String TAKS_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列 参数:第二个会不会随着重启消失 第三个队列是否仅能給连接使用 第四个未使用自动删除? 第五个参数
        channel.queueDeclare(TAKS_QUEUE_NAME, true, false, false, null);
        System.out.println("开始接收消息");
        channel.basicConsume(TAKS_QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了消息:" + message);
                try{
                    doWork(message);
                }finally {
                    System.out.println("完成消息处理");
                }
            }
        });
    }

    private static void doWork(String task) {
        //有点延迟1秒
        char[] chars = task.toCharArray();
        for (char ch : chars) {
            if (ch == '.') {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}
==========================================================================================
收到了消息:0...
完成消息处理
收到了消息:1...
完成消息处理
收到了消息:2...
完成消息处理
收到了消息:3...
完成消息处理
收到了消息:4...
完成消息处理
收到了消息:5...
完成消息处理
收到了消息:6...
完成消息处理
收到了消息:7...
完成消息处理
收到了消息:8...
完成消息处理
收到了消息:9...
完成消息处理
并行 多个消费者平均压力!!!
公平派遣[因为奇数偶数压力不同] + 消息确认 【以工作量的程度去分任务
//在Worker消费方打开并行消费 Run/Debug Configurations → Modify options蓝字 → Allow multiple instances             此时两个woker开始配合平均压力 一起工作
workqueues/Worker.java
package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,接收前面的批量消息
 */
public class Worker {
    private final static String TAKS_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列 参数:第二个会不会随着重启消失 第三个队列是否仅能給连接使用 第四个未使用自动删除? 第五个参数
        channel.queueDeclare(TAKS_QUEUE_NAME, true, false, false, null);
        System.out.println("开始接收消息");
        channel.basicQos(1); //最希望处理的数量 处理完之前不会接收下一个任务
        channel.basicConsume(TAKS_QUEUE_NAME, false, new DefaultConsumer(channel) { //关掉自动接收 要手动确认false
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了消息:" + message);
                try{
                    doWork(message);
                }finally {
                    System.out.println("完成消息处理");
                    channel.basicAck(envelope.getDeliveryTag(), false); //false 不同时一起确认 手动确认消息
                }
            }
        });
    }

    private static void doWork(String task) {
        //有点延迟1秒
        char[] chars = task.toCharArray();
        for (char ch : chars) {
            if (ch == '.') {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}
=======================================================================================
开始接收消息
收到了消息:1
消息处理完成
收到了消息:2...
消息处理完成
......

交换机工作模式

  • fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的
  • direct:根据RoutingKey匹配消息路由到指定队列
  • topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配的方式进行相应转发
  • headers:根据发送消息内容中的headers属性来匹配
fanout模式
广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的 [所有消息无差别发送]

fanout/EmitLog.java【生产者】
package fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发送日志信息
 */
public class EmitLog {
    //创建交换机
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String message = "info: Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + message);
        channel.close();
        connection.close();
    }
}
fanout/ReceiveLogs.java【消费者】
package fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:接收日志消息 有多个接收同样的消息
 */
public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //非持久会自动删除的队列 在同一个类中多次启动 每一次队列名字都不一样
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列的绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("开始接收消息");
        Consumer consumer =  new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:"+message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
direct模式
根据RoutingKey匹配消息路由到指定队列 [消费者接收消息不一致]

direct/ReceiveLogsDirect1.java
package direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 接收3个等级的日志
 */
public class ReceiveLogsDirect1 {

    private static final String EXCHANGE_NAME = "direct_log";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();

        //一个交换机同时绑定三个queue
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" 开始接收消息");

        Consumer consumer =  new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:"+message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
=============================================================================
收到消息:Hello World!
收到消息:Hello World!
收到消息:Hello World!
收到消息:Hello World!
direct/ReceiveLogsDirect2.java
package direct;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;

/**
 * 接收1个等级的日志
 */
public class ReceiveLogsDirect2 {

    private static final String EXCHANGE_NAME = "direct_log";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();

        //一个交换机同时绑定两个queue,比刚第一个Receiver少接收了一个error等级
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");

        System.out.println("开始接收消息");

        Consumer consumer =  new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:"+message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
=============================================================================
收到消息:Hello World!
收到消息:Hello World!
收到消息:Hello World!
direct/EmitLogDirect.java
package direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机 DIRECT
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String message = "Hello World!";
        //发送第一个等级消息
        channel.basicPublish(EXCHANGE_NAME, "info",null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + "等级为info,消息内容:" + message);

        //发送第二个等级消息
        channel.basicPublish(EXCHANGE_NAME, "warning",null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + "等级为warning,消息内容:" + message);

        //发送第三个等级消息
        channel.basicPublish(EXCHANGE_NAME, "error",null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + "等级为error,消息内容:" + message);
        channel.close();
        connection.close();
    }
}
=============================================================================
发送了消息:等级为info,消息内容:Hello World!
发送了消息:等级为warning,消息内容:Hello World!
发送了消息:等级为error,消息内容:Hello World!
topic模式
比如消息严重性怎么样、只想记录error模块的用户信息
  • ***** 可以替代一个单词
  • # 可以替代零个或多个单词

【发送者/生产者】
topic/EmitLogTopic.java
package topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

/**
 * topic模式交换机,发送消息
 */
public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.98.225.105");
            factory.setUsername("admin");
            factory.setPassword("password");

            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String message = "Animal WorldroutingKey";

            String[] routingKeys = new String[9];
            routingKeys[0] = "quick.orange.rabbit";
            routingKeys[1] = "lazy.orange.elephant";
            routingKeys[2] = "quick.orange.fox";
            routingKeys[3] = "lazy.brown.fox";
            routingKeys[4] = "lazy.pink.rabbit";
            routingKeys[5] = "quick.brown.fox";
            routingKeys[6] = "orange";
            routingKeys[7] = "quick.orange.male.rabbit";
            routingKeys[8] = "lazy.orange.male.rabbit";
            for (int i = 0; i < routingKeys.length; i++) {
                channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null,
                        message.getBytes("UTF-8"));
                System.out.println("发送了:" + message + ":" + routingKeys[i]);
            }


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}
==================================================================================
发送了:Animal WorldroutingKey:quick.orange.rabbit
发送了:Animal WorldroutingKey:lazy.orange.elephant
发送了:Animal WorldroutingKey:quick.orange.fox
发送了:Animal WorldroutingKey:lazy.brown.fox
发送了:Animal WorldroutingKey:lazy.pink.rabbit
发送了:Animal WorldroutingKey:quick.brown.fox
发送了:Animal WorldroutingKey:orange
发送了:Animal WorldroutingKey:quick.orange.male.rabbit
发送了:Animal WorldroutingKey:lazy.orange.male.rabbit
【接收者/消费者Ⅰ(对"*.orange.*" 数据感兴趣)】
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
/**
 * 特定路由键
 */
public class ReceiveLogsTopic1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        //指定bindingKey 最重要的一步!
        String bindingKey = "*.orange.*";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        System.out.println("开始接收消息");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(
                        " 接收消息: " + message + ":" + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
==================================================================================
开始接收消息
 接收消息: Animal WorldroutingKey:quick.orange.rabbit
 接收消息: Animal WorldroutingKey:lazy.orange.elephant
 接收消息: Animal WorldroutingKey:quick.orange.fox
【接收者/消费者Ⅱ(对"*.*.rabbit" 和 "lazy.#" 数据感兴趣)】
package topic;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
/**
 * 特定路由键
 */
public class ReceiveLogsTopic2 {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.98.225.105");
        factory.setUsername("admin");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        //指定bindingKey
        String bindingKey1 = "*.*.rabbit";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey1);
        String bindingKey2 = "lazy.#";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey2);

        System.out.println("开始接收消息");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(
                        " 接收消息: " + message + ":" + envelope.getRoutingKey());
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
==================================================================================
开始接收消息
 接收消息: Animal WorldroutingKey:quick.orange.rabbit
 接收消息: Animal WorldroutingKey:lazy.orange.elephant
 接收消息: Animal WorldroutingKey:lazy.brown.fox
 接收消息: Animal WorldroutingKey:lazy.pink.rabbit
 接收消息: Animal WorldroutingKey:lazy.orange.male.rabbit
  • headers:根据发送消息内容中的headers属性来匹配

Spring Boot整合RabbitMQ

  • 实操代码演示

创建两个Springboot项目
spring-boot-rabbitmq-consumer 和 spring-boot-rabbitmq-consumer
在application.properties中设置 如果是本机用户名和密码都是guest
spring.rabbitmq.username=guest

项目:spring-boot-rabbitmq-producer [消息发送者/生产者]
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.imooc</groupId>
    <artifactId>spring-boot-rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq-consumer</name>
    <description>spring-boot-rabbitmq-consumer</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
application.properties
spring.application.name=spring-boot-rabbitmq-producer
server.port=8080
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
com/imooc/springbootrabbitmqproducer/MsgSender.java
package com.imooc.springbootrabbitmqproducer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 发送消息
 */
//@Component让springboot发现
@Component
public class MsgSender {
    @Autowired
    private AmqpTemplate rabbitmqTemplate;
    public void send1(){
        String message = "This is message 1, routing key is dog.red";
        System.out.println("发送了:" + message);
        //指定交换机的名字  routing key is dog.red  消息内容
        this.rabbitmqTemplate.convertAndSend("bootExchange","dog.red",message);
    }
    public void send2(){
        String message = "This is message 2, routing key is dog.black";
        System.out.println("发送了:" + message);
        //指定交换机的名字  routing key is dog.red  消息内容
        this.rabbitmqTemplate.convertAndSend("bootExchange","dog.black",message);
    }
}
com/imooc/springbootrabbitmqproducer/TopicRabbitConfig.java
package com.imooc.springbootrabbitmqproducer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * rabbitmq配置类
 */
@Configuration
public class TopicRabbitConfig {
    @Bean
    public Queue queue1(){
        return new Queue("queue1");
    }
    @Bean
    public Queue queue2(){
        return new Queue("queue2");
    }
    //用一个Bean定义交换机
    @Bean
    TopicExchange exchange(){
        return new TopicExchange("bootExchange");
    }

    //有了Q1 Q2后去指定一个topic交换机
    //绑定到交换机上
    @Bean
    Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange){
        //to绑定到哪个交换机 with指定routingKey
        return BindingBuilder.bind(queue1()).to(exchange).with("dog.red");
    }
    @Bean
    Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange){
        //to绑定到哪个交换机 with指定routingKey
        return BindingBuilder.bind(queue2()).to(exchange).with("dog.#");
    }
}
用test的测试类
com/imooc/springbootrabbitmqproducer/SpringBootRabbitmqProducerApplicationTests.java
package com.imooc.springbootrabbitmqproducer;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringBootRabbitmqProducerApplicationTests {

    @Autowired
    MsgSender msgSender;

    @Test
    public void send1(){
        msgSender.send1();
    }
    @Test
    public void send2(){
        msgSender.send2();
    }
}
项目:spring-boot-rabbitmq-consumer [消息接收者/消费者]
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.imooc</groupId>
    <artifactId>spring-boot-rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq-consumer</name>
    <description>spring-boot-rabbitmq-consumer</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
application.properties
spring.application.name=spring-boot-rabbitmq-consumer
server.port=8081
spring.rabbitmq.addresses=127.0.0.1:15672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
Receiver1.java
package com.imooc.springbootrabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者1
 */
@Component
@RabbitListener(queues = "queue1")
public class Receiver1 {
    @RabbitHandler //收到这个消息后怎么处理
    public void process(String message){
        System.out.println("Receiver1:" + message);
    }
}
Receiver2.java
package com.imooc.springbootrabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者2
 */
@Component
@RabbitListener(queues = "queue2")
public class Receiver2 {
    @RabbitHandler //收到这个消息后怎么处理
    public void process(String message){
        System.out.println("Receiver2:" + message);
    }
}

先启动消费者 然后再启动生产者test中的测试
就可以发现
send1() → 发送了:This is message 1, routing key is dog.red
send2() → 发送了:This is message 2, routing key is dog.black