JUC编程(JDK1.8)

​ java.util.concurrent
​ java.util.concurrent.atomic
​ java.util.concurrent.locks

1,线程和进程

进程里边包含线程,一个进程有且至少包含一个线程
java默认有2个线程,main和GC线程

并发和并行

并发:单挑
并行:群殴

线程有几种状态

public enum State {
        /**
         * Thread state for a thread which has not yet started.
         	新生
         */
        NEW,

        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         	运行
         */
        RUNNABLE,

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         	阻塞
         */
        BLOCKED,

        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         *   <li>{@link Object#wait() Object.wait} with no timeout</li>
         *   <li>{@link #join() Thread.join} with no timeout</li>
         *   <li>{@link LockSupport#park() LockSupport.park}</li>
         * </ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called <tt>Object.wait()</tt>
         * on an object is waiting for another thread to call
         * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
         * that object. A thread that has called <tt>Thread.join()</tt>
         * is waiting for a specified thread to terminate.
         	等待
         */
        WAITING,

        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         *   <li>{@link #sleep Thread.sleep}</li>
         *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
         *   <li>{@link #join(long) Thread.join} with timeout</li>
         *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         * </ul>
         	超时等待
         */
        TIMED_WAITING,

        /**
         * Thread state for a terminated thread.
         * The thread has completed execution
         	终止 死亡
         */
        TERMINATED;
    }

wait和sleep的区别

1,类不同
wait Object 等待
sleep Thread 睡眠

2,锁的释放
wait 会释放锁
sleep 不会释放锁

3,使用的范围不同
wait必须在同步代码块中使用
sleep可以到处睡觉

4,异常的捕获
wait不需要捕获异常
sleep必须捕获异常

2,lock锁

Synchronized

package org.ph;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph
 * @project juc-api
 * @since 2022 -02 -21 21:53
 */
public class SaleTicket {
    public static void main(String[] args) {
        //什么是并发,多个线程同时操作同一个资源类(Ticket)

        //new 资源类
        Ticket ticket = new Ticket();

        //三个卖票员同时卖30张票
        new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        }, "卖票员A").start();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        }, "卖票员B").start();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                ticket.sale();
            }
        }, "卖票员C").start();

    }

}

class Ticket {
    private int number = 30;

   	 /**
         * synchronized:
         *  同步锁的本质是队列,每个资源排队得到锁
         *  锁什么?
         *      修饰实例方法,作用于当前实例加锁,进入同步代码前要获得当前实例的锁
         *      修饰静态方法,作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁
         *      修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁
         */
    public synchronized void sale() {
        if (number > 0) {
            System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张票,剩余了" + number);
        }
    }
}

JUC(Lock)

Lock锁

两个核心方法

	//加锁  
	Lock l = ...; l.lock(); 
		try { 
    // access the resource protected by this lock 
  } finally {
    //解锁
    l.unlock(); 
  } 

三个实现类

image-20220221222908347

使用 Lock lock = new ReentrantLock();

package org.ph;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph
 * @project juc-api
 * @since 2022 -02 -21 22:20
 */
public class SaleTicketLock {
    public static void main(String[] args) {
        //什么是并发,多个线程同时操作同一个资源类(Ticket)
        //new 资源类
        TicketLock ticket = new TicketLock();

        //三个卖票员同时卖30张票
        new Thread(() -> {for (int i = 0; i < 40; i++) ticket.sale();}, "卖票员A").start();
        new Thread(() -> {for (int i = 0; i < 40; i++) ticket.sale();}, "卖票员B").start();
        new Thread(() -> {for (int i = 0; i < 40; i++) ticket.sale();}, "卖票员C").start();
    }

}

class TicketLock {
    private int number = 30;

    /**
     * 使用锁:
     * 公平锁:老老实实排队
     * 非公平锁:可以插队  (默认,因为有优先级,不至于执行时间少的线程一直排队)
     * 默认是非公平锁 sync = new NonfairSync();
     */
    Lock lock = new ReentrantLock();

    public void sale() {
        lock.lock();

        try {
            //具体的业务代码
            if (number > 0) {
                System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "张票,剩余了" + number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}	




//
    Lock lock = new ReentrantLock();


/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

Synchronized和Lock的区别

1,Synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁

2,Synchronized会自动释放锁,Lock锁手动释放,否则会出现死锁

3,Synchronized线程获得锁后阻塞后,其他线程会死等;Lock有一个 lock.tryLock(); 尝试获取锁

4,Synchronized 可重入锁,不可中断的非公平锁;Lock可重入锁,可判断,自己设置是否公平。

5,Synchronized 适合少量的代码同步问题,Lock适合锁大量的同步代码块。

3,生产者消费者

synchroized

用到的方法

Modifier and Type

Method and Description

void

notify() 唤醒正在等待对象监视器的单个线程。

void

notifyAll() 唤醒正在等待对象监视器的所有线程。

void

wait() 导致当前线程等待,直到另一个线程调用该对象的 notify()方法或 notifyAll()方法。

void

wait(long timeout) 导致当前线程等待,直到另一个线程调用 notify()方法或该对象的 notifyAll()方法,或者指定的时间已过。

void

wait(long timeout, int nanos) 导致当前线程等待,直到另一个线程调用该对象的 notify()方法或 notifyAll()方法,或者某些其他线程中断当前线程,或一定量的实时时间。

package org.ph.pc;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pc
 * @project juc-api
 * @since 2022 -02 -23 21:23
 */
public class PcProject {
    public static void main(String[] args) {
        Number number=new Number();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    number.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程+").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    number.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"线程-").start();
    }

}

class Number {
    private int number = 0;

    /**
     *     放入两个线程,一个执行+1操作,一个执行-1操作
     */
    public synchronized void increment() throws InterruptedException {
        if (number != 0) {
            //等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + "===>" + number);
        //通知其他线程 +1执行完毕
        this.notifyAll();
    }

    public synchronized void decrement() throws InterruptedException {
        if (number == 0) {
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName() + "===>" + number);
        //通知其他线程 +1执行完毕
        this.notifyAll();
    }
}

上述代码存在的问题

​ 增加线程后出现的问题,可能+操作执行多次

​ 问题出在if判断上,多个线程同时在执行+1,而if判断只判断一次;

  • 线程也可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒 。 虽然这在实践中很少会发生,但应用程序必须通过测试应该使线程被唤醒的条件来防范,并且如果条件不满足则继续等待。 换句话说,等待应该总是出现在循环中,就像这样:

      synchronized (obj) {
             while (<condition does not hold>)
                 obj.wait(timeout);
             ... // Perform action appropriate to condition
         } 
    

改变后:

package org.ph.pc;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pc
 * @project juc-api
 * @since 2022 -02 -23 21:23
 */
public class PcProject {
    public static void main(String[] args) {
        Number number = new Number();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    number.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "1号线程+").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    number.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "1号线程-").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    number.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "2号线程+").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    number.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "2号线程-").start();
    }

}

class Number {
    private int number = 0;

    /**
     * 放入两个线程,一个执行+1操作,一个执行-1操作
     */
    public synchronized void increment() throws InterruptedException {
//        if (number != 0) {
//            //等待
//            this.wait();
//        }
        while (number != 0) {
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + "===>" + number);
        //通知其他线程 +1执行完毕
        this.notifyAll();
    }

    public synchronized void decrement() throws InterruptedException {
//        if (number == 0) {
//            //等待
//            this.wait();
//        }
        while (number == 0) {
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName() + "===>" + number);
        //通知其他线程 +1执行完毕
        this.notifyAll();
    }


}

JUC

用到的接口和方法

  • Condition接口

Modifier and Type

Method and Description

void

await() 导致当前线程等到发信号或 interrupted

boolean

await(long time, TimeUnit unit) 使当前线程等待直到发出信号或中断,或指定的等待时间过去。

long

awaitNanos(long nanosTimeout) 使当前线程等待直到发出信号或中断,或指定的等待时间过去。

void

awaitUninterruptibly() 使当前线程等待直到发出信号。

boolean

awaitUntil(Date deadline) 使当前线程等待直到发出信号或中断,或者指定的最后期限过去。

void

signal() 唤醒一个等待线程。

void

signalAll() 唤醒所有等待线程。

package org.ph.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pc
 * @project juc-api
 * @since 2022 -02 -23 21:23
 */
public class PcLockProject {
    public static void main(String[] args) {
        NumberLock number = new NumberLock();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                number.increment();
            }
        }, "1号线程+").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                number.decrement();
            }
        }, "1号线程-").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                number.increment();
            }
        }, "2号线程+").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                number.decrement();
            }
        }, "2号线程-").start();
    }
}

class NumberLock {
    private int number = 0;

    final Lock lock = new ReentrantLock();
    final Condition condition = lock.newCondition();

    /**
     * 放入两个线程,一个执行+1操作,一个执行-1操作
     */
    public void increment() {
        lock.lock();
        try {
            while (number != 0) {
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "===>" + number);
            //通知其他线程 +1执行完毕
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() {
        lock.lock();
        try {
            while (number == 0) {
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "===>" + number);
            //通知其他线程 +1执行完毕
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

此时使用Condition执行的结果线程是随机的 不可控的

package org.ph.lock;

import net.bytebuddy.asm.Advice;
import org.omg.PortableInterceptor.LOCATION_FORWARD;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lock
 * @project juc-api
 * @since 2022 -02 -24 21:50
 */
public class SaleTicketCondition {

    public static void main(String[] args) {
        ConditionLock conditionLock = new ConditionLock();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                conditionLock.printlnA();
            }
        }, "线程A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                conditionLock.printlnB();
            }
        }, "线程B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                conditionLock.printlnC();
            }
        }, "线程C").start();
    }
}


/**
 * 资源内部类
 */
class ConditionLock {

    final Lock lock = new ReentrantLock();
    final Condition conditionA = lock.newCondition();
    final Condition conditionB = lock.newCondition();
    final Condition conditionC = lock.newCondition();

    private int number = 1;

    public void printlnA() {
        lock.lock();
        try {
            while (number != 1) {
                conditionA.await();
            }
            System.out.println(Thread.currentThread().getName() + "===printlnA执行");
            //唤醒指定的线程
            number = 2;
            conditionB.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    public void printlnB() {
        lock.lock();
        try {
            while (number != 2) {
                conditionB.await();
            }
            System.out.println(Thread.currentThread().getName() + "===printlnB执行");
            //唤醒指定的线程
            number = 3;
            conditionC.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printlnC() {
        lock.lock();
        try {
            while (number != 3) {
                conditionC.await();
            }
            System.out.println(Thread.currentThread().getName() + "===printlnC执行");
            //唤醒指定的线程
            number = 1;
            conditionA.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

4,8锁现象

怎么判断锁的是谁? 什么是锁,锁的到底是什么

首先锁会锁两个东西,一个是new 出的对象,一个是Class模板(对象是由Class模板new 出的)

示例1:

​ 以下代码谁先执行?

​ 发短信先执行,但并不是因为先调用的原因

package org.ph.lock8;

import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lock8
 * @project juc-api
 * @since 2022 -02 -24 22:36
 */
public class Lock8 {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        new Thread(() -> {
            phone.sendSms();
        }, "短信线程").start();

        //睡1秒
        TimeUnit.SECONDS.sleep(2);

        new Thread(() -> {
            phone.call();
        }, "电话线程").start();

    }
}


class Phone {
    public synchronized void sendSms() {
        System.out.println("短信发送了...");
    }

    public synchronized void call() {
        System.out.println("电话call出了...");
    }
}

为了验证,我们在发短信前睡眠4秒钟,远远大于线程调用时的睡眠1秒

​ 而此时,执行是睡眠1秒后,在睡眠4秒,继续执行发短信-打电话。

为什么呢?

​ 这是因为有的存在

关键点在于synchronized

synchronized修饰方法的时候,锁的是这个方法的调用者,此时我们的调用者是Phone对象,而我们的发短信和打电话这两个方法都被synchronized修饰,这两个方法都被Phone对象调用,所以此时这两个方法使用的是同一把锁,两个方法谁先拿到锁谁先执行

​ 锁是对象的 第一个线程拿到的是这个对象的锁 第二个线程需要的也是这个对象的锁 所以第一个线程执行方法时 锁还没有释放 第二个线程就拿不到锁 也就没办法执行

(睡眠使得)

package org.ph.lock8;

import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lock8
 * @project juc-api
 * @since 2022 -02 -24 22:36
 * @说明 8锁的问题
 * 就是锁的8个问题
 *      此处使用synchronized
 *          修饰方法,锁的就是方法的调用者(调用对象)
 * 1.标准情况下,两个线程先执行那个方法? 此处:发短信,打电话
 *      结果,1.发短信 2.打电话
 *      为什么?首先不是先调用的原因
 *          是因为有锁存在,
 * 2.发短信方延迟4s,两个线程先执行那个方法? 此处:发短信,打电话
 *      结果,4s后 输出发短信,打电话
 *      原因,发短信是抱着锁延迟了4s,锁并没有释放
 *          两个方法是同一个对象调用的,也是同一把锁,谁先拿到,谁先执行
 */
public class Lock8 {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        for (int i = 0; i < 50; i++) {

            new Thread(() -> {
                phone.sendSms();
            }, "短信线程").start();

            //睡1秒
            TimeUnit.SECONDS.sleep(1);

            new Thread(() -> {
                phone.call();
            }, "电话线程").start();
        }

    }
}


class Phone {
    public synchronized void sendSms() {
        System.out.println("短信发送了...");
    }

    public synchronized void call() {
        System.out.println("电话call出了...");
    }
}

不同对象调用

package org.ph.lock8;

import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lock8
 * @project juc-api
 * @since 2022 -02 -27 15:36
 * @说明
 *  3.增加一个普通的方法后,执行的流程是什么?
 *      结果:1.你好 2.发短信
 *          原因:hello方法不受锁的影响
 *  4.增加一个方法调用对象,调用打电话和发短信
 *      1对象调用短信,2对象调用打电话
 *          结果:1.打电话 2.发短信
 *          因为不同的调用对象不是用一把锁,1对象调用的发短信线程在阻塞,2对象调用的打电话没有阻塞
 *              所以结果先执行打电话,发短信延迟完毕后输出
 */
public class Lock8_b {
    public static void main(String[] args) throws InterruptedException {

        Phoneb phoneb1 = new Phoneb();
        Phoneb phoneb2=new Phoneb();

        new Thread(() -> {
            phoneb1.sendSms();
        }, "短信线程").start();

        //睡1秒
        TimeUnit.SECONDS.sleep(1);

        new Thread(() -> {
            phoneb2.call();
        }, "电话线程").start();

    }
}


class Phoneb {
    public synchronized void sendSms() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("短信发送了...");
    }

    public synchronized void call() {
        System.out.println("电话call出了...");
    }

    public void hello() {
        System.out.println("你好");
    }
}

静态方法

package org.ph.lock8;

import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lock8
 * @project juc-api
 * @since 2022 -02 -27 15:51
 * @说名
 *  5.锁方法变为静态方法,同一个对象来调用,执行结果是什么?
 *      结果:1.发短信 2.打电话
 *      原因:static静态方法是类一加载就有了,锁的是Class模板。Phonec
 *  6.锁方法变为静态方法,不同的对象来调用,执行结果是什么?
 *      结果:1.发短信 2.打电话
 *      原因:static静态方法是类一加载就有了,锁的是Class模板,也就是此处的(整个Phonec对象)
 *  方法不加static锁的是调用对象,加了static锁的就是整个对象,全局唯一的一个Class模板。
 */
public class Lock8_c{
    public static void main(String[] args) throws InterruptedException {

        Phonec phoneb1 = new Phonec();
        Phonec phoneb2 = new Phonec();

        new Thread(() -> {
            phoneb1.sendSms();
        }, "短信线程").start();

        //睡1秒
        TimeUnit.SECONDS.sleep(1);

        new Thread(() -> {
            phoneb2.call();
        }, "电话线程").start();

    }
}


class Phonec {
    public static synchronized void sendSms() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("短信发送了...");
    }

    public  static  synchronized void call() {
        System.out.println("电话call出了...");
    }
}

静态方法和普通方法

package org.ph.lock8;

import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lock8
 * @project juc-api
 * @since 2022 -02 -27 16:05
 * @说明
 *  7.一个静态方法和一个普通的synchronized修饰的方法
 *      同一个对象来调用这两个方法,执行的顺序是什么?
 *          结果:不加static的打电话方法先输出,在输出发短息方法
 *          原因:此时这两个方法已经不是用一个锁了,不需要等待另一个锁阻塞了
 *  8.一个静态方法和一个普通的synchronized修饰的方法
 *      不同一个对象来调用这两个方法,执行的顺序是什么?
 *          结果:先输出不加static方法,在输出static方法
 *          原因:静态方法走的是Class模板的锁,普通方法走的调用者的锁(模糊不定)
 */
public class Lock8_d{
    public static void main(String[] args) throws InterruptedException {

        Phoned phoneb1 = new Phoned();
        Phoned phoneb2 = new Phoned();

        new Thread(() -> {
            phoneb1.sendSms();
        }, "短信线程").start();

        //睡1秒
        TimeUnit.SECONDS.sleep(1);

        new Thread(() -> {
            phoneb2.call();
        }, "电话线程").start();

    }
}


class Phoned{
    public static synchronized void sendSms() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("短信发送了...");
    }

    public    synchronized void call() {
        System.out.println("电话call出了...");
    }
}

类锁和对象锁的区别

new new出来的对象是具体的 this(当前实例)

static Class 唯一的一个模板

​ 类的对象是模板,对象是类的实例。

静态代码块加锁,实例方法也要在静态代码块的类锁释放后才能执行。

5,集合

1.List

package org.ph.gather;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.gather
 * @project juc-api
 * @since 2022 -02 -27 21:14
 */
@SuppressWarnings("all")
public class AList {
    public static void main(String[] args) {
        多线程下安全的ArrayList();
    }

    public static void 多线程下不安全的ArrayList() {
        /**
         * java.util.ConcurrentModificationException
         *  错误信息: 并发修改异常
         *      原因:并发下ArrayList是不安全的
         */
        List<String> list = new ArrayList();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString());
                System.out.println("list = " + list);
            }, String.valueOf(i)).start();
        }
    }

    public static void 多线程下安全的ArrayList() {
        //使用Collections调用安全的ArrayList
        //List<String> list = Collections.synchronizedList(new ArrayList<>());

        /**
         * 写入时复制
         *  CopyOnWriteArrayList对比Vector
         *      CopyOnWriteArrayList:使用lock锁,11-14?(优化) 返回使用Synchronized
         *      Vector:方法使用Synchronized,每个线程都要等待锁,同一时刻只有一个线程在执行
         *              Vector所有的操作都有锁,但是copyonwrite读操作是没有锁的!在开发中读比写更频繁,
         *              所以说copyonwrite好在这里,另外还有一个有点是数组扩容问题。cow不需要扩容
         */
        List<String> list = new CopyOnWriteArrayList<>();


        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println("list = " + list);
            }, String.valueOf(i)).start();
        }
    }
}

2.Set

package org.ph.gather;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.gather
 * @project juc-api
 * @since 2022 -02 -27 22:11
 */
public class ASet {
    public static void main(String[] args) {
        Set集合();

    }

    public static void Set集合() {
        /**
         *  不安全
         *  底层就是new了个HashMap();
         *      add()方法就是put()方法; map的key不可重复,所以set集合也不可重复
         */
//        Set<String> stringSet = new HashSet<>();
        /**
        *安全
        */
//        Set<String> stringSet = Collections.synchronizedSet(new HashSet<>());
        /**
        *安全
        */
        Set<String> stringSet = new CopyOnWriteArraySet<>();

        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                stringSet.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println("stringSet = " + stringSet);
            }, String.valueOf(i)).start();
        }
    }
}

3.Map

package org.ph.gather;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.gather
 * @project juc-api
 * @since 2022 -02 -28 20:33
 */
public class AMap {
    public static void main(String[] args) {
        map集合();
    }

    public static void map集合() {
//        Map<String, Object> map = new HashMap<>(16);
        /**
         * ConcurrentHashMap:
         *  
         */
        Map<String, Object> map = new ConcurrentHashMap<>(16);
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println("map = " + map);
            }, String.valueOf(i)).start();
        }

    }
}

6,Callable

Callable接口类似于Runnable,因为它们都是为期实例可能由另一个线程执行的类设计的。然而,Runnable不返回结果,也不抛出异常,Callable可以有返回值,也可以抛出异常

示例:

package org.ph.callablep;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.callablep
 * @project juc-api
 * @since 2022 -02 -28 21:04
 */
public class ICallable {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CallableTest callableTest = new CallableTest();
        //传入实现Callable<>泛型的类	适配器模式
        FutureTask futureTask = new FutureTask(callableTest);
        //放入
        new Thread(futureTask, "A").start();
        //得到返回值 这个地方程序如果耗时 会发生阻塞 因为要等待结果返回 所以放在程序的最后 OR 异步通信
        Object o = futureTask.get();
        System.out.println("o.toString() = " + o.toString());

    }

}


class CallableTest implements Callable<String> {

    @Override
    public String call() throws Exception {
        return "调用Call";
    }
}

7,常用辅助类

1,CountDownLatch

允许一个或多个线程等待直到其他线程中执行的一组操作完成的同步辅助

package org.ph.countutil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.countutil
 * @project juc-api
 * @since 2022 -03 -03 21:21
 * 计数器
 */
public class CountDownLatchProject {
    /**
     * 飞机人数齐了就起飞的示例
     *
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {
        Aircraft aircraft = new Aircraft();
        aircraft.init();
    }
}


class Aircraft {
    private int number = 12;

    Lock lock = new ReentrantLock();

    public void init()  {
        lock.lock();
        try {
            //入参    机票人数
            CountDownLatch countDownLatch = new CountDownLatch(number);
            for (int i = 0; i <= number; i++) {
                new Thread(() -> {
                    System.out.println("座位号:" + Thread.currentThread().getName() + "人员已到位");
                    //此时 空姐拿着计数器来查人数
                    countDownLatch.countDown();
                    System.out.println(countDownLatch.toString());
                }, String.valueOf(i)).start();
            }
            //查完之后计数器清零 准备起飞
            countDownLatch.await();
            System.out.println("✈起飞✈✈✈✈✈✈✈✈✈✈✈✈✈");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

2,CyclicBarrier

package org.ph.countutil;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.countutil
 * @project juc-api
 * @since 2022 -03 -14 21:21
 */
public class IcyclicBarrier {
    public static void main(String[] args) {
        /**
         * 只有当线程执行到入参数量后才会执行线程
         */
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("乘客已满,出发");
        });

        for (int i = 1; i <=7; i++) {
            final  int tempI = i;
            new Thread(()->{
                System.out.println("第"+tempI+"位乘客:"+Thread.currentThread().getName()+"上车");
                try {
                    //等待 到7之后执行 cyclicBarrier
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

3,Semaphore

Semaphore :信号量

​ 一个计数信号量,在概念上,信号量维持一组许可证。如果有必要,没个acquire()都会阻塞,知道许可证可用,然后才能使用它。每个release()添加许可证,潜在的释放阻塞获取方。但是,没有使用实际的许可证对象,Semaphore只保留可用数量的计数,并相应执行

package org.ph.countutil;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.countutil
 * @project juc-api
 * @since 2022 -03 -14 21:41
 */
public class ISemaphore {
    public static void main(String[] args) {
        //一般用于限流
        Semaphore semaphore = new Semaphore(4);

        /**
         * acquire:
         *  得到  如果已经到达限位则等待,知道释放为止
         * release:
         *  释放  将当前信号量释放+1,然后唤醒等待的线程
         *  使用场景
         *      多个共享资源互斥的使用
         *      并发限流,控制最大的线程数
         */
        for (int i = 1; i <= 7; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "红灯等待");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + "绿灯通过");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }

    }
}

8,读写锁

package org.ph.rwlock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.rwlock
 * @project juc-api
 * @since 2022 -03 -14 22:04
 */
public class IReadWriteLock {

    public static void cacheNoLockStart() {
        final ICacheNoLock iCache = new ICacheNoLock();
        //多线程写入
        for (int i = 0; i < 5; i++) {
            final int tempI = i;
            new Thread(() -> {
                iCache.put(tempI + "", tempI + "");
            }, String.valueOf(tempI)).start();
        }

        //多线程读取
        for (int i = 0; i < 5; i++) {
            final int tempI = i;
            new Thread(() -> {
                iCache.get(tempI + "");
            }, String.valueOf(tempI)).start();
        }
    }

    public static void cacheLockStart() {
        final ICacheLock iCache = new ICacheLock();
        //多线程写入
        for (int i = 0; i < 5; i++) {
            final int tempI = i;
            new Thread(() -> {
                iCache.lockPut(tempI + "", tempI + "");
            }, String.valueOf(tempI)).start();
        }

        //多线程读取
        for (int i = 0; i < 5; i++) {
            final int tempI = i;
            new Thread(() -> {
                iCache.lockGet(tempI + "");
            }, String.valueOf(tempI)).start();
        }
    }

    public static void main(String[] args) {
        cacheLockStart();
    }
}


class ICacheNoLock {
    /**
     * 保证原子性
     */
    private volatile Map<String, Object> map = new HashMap<>();

    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "写入" + key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "写入" + key + "完毕");
    }

    public Object get(String key) {
        System.out.println(Thread.currentThread().getName() + "读取" + key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName() + "读取" + key + "完毕");
        return o;
    }
}


class ICacheLock {
    /**
     * 保证原子性
     */
    private volatile Map<String, Object> map = new HashMap<>();
    /**
     * 创建lock锁
     * 读写锁 更加细粒度的
     */
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * 写入 希望写入的时候 只有一个线程同时在写入
     * 也称为独占锁   一次只能有一个线程占有
     *
     * @param key
     * @param value
     */
    public void lockPut(String key, Object value) {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "(读写锁介入)写入" + key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "(读写锁介入)写入" + key + "完毕");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }

    /**
     * 读取的时候 所有人都可读取
     * 也称为共享锁   多个线程可以同时占有
     *
     * @param key
     * @return
     */
    public Object lockGet(String key) {
        readWriteLock.readLock().lock();
        Object o = null;
        try {
            System.out.println(Thread.currentThread().getName() + "(读写锁介入)读取" + key);
            o = map.get(key);
            System.out.println(Thread.currentThread().getName() + "(读写锁介入)读取" + key + "完毕");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
        return o;
    }
}

9.阻塞队列

阻塞队列

类图

可以看出BlockKingQueue是和Set,List同级,并且实现类也都差不多

阻塞队列一般使用在多线程并发处理和线程池

1.阻塞队列的使用

主要操作有相对列中添加和移除,以及4组API

方式

抛出异常

有返回值

阻塞 等待

超时等待

添加

add()

offer()

put()

offer(“xx”,2, TimeUnit.SECONDS)

移除

remove()

poll()

take()

poll(2, TimeUnit.SECONDS)

判断队列首

element()

peek()

-

-

添加和移除

package org.ph.queue;

import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.queue
 * @project juc-api
 * @since 2022 -03 -20 22:16
 */
public class Rest {

    public static void main(String[] args) throws InterruptedException {
        arrayBlockQueueTimeOut();
    }

    /**
     * 抛出异常的
     */
    public static void arrayBlockQueue() {
        //创建时需要初始化队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.add("李");
        arrayBlockingQueue.add("王");
        arrayBlockingQueue.add("张");

        /**
         * 当队列满足初始化的大小时,继续添加就会抛出异常
         *  java.lang.IllegalStateException: Queue full
         */
//        arrayBlockingQueue.add("赵");

        /**
         * 取出队列元素
         * 当元素被取完的时候再取就会java.util.NoSuchElementException
         */
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());

        System.out.println(arrayBlockingQueue.remove());
    }


    /**
     * 有返回值的
     */
    public static void arrayBlockQueueOffer(){
        //创建时需要初始化队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        /**
         * offer() 不会抛出异常 而是返回false
         */
        System.out.println(arrayBlockingQueue.offer("李"));
        System.out.println(arrayBlockingQueue.offer("王"));
        System.out.println(arrayBlockingQueue.offer("张"));

        System.out.println(arrayBlockingQueue.offer("孙"));

        System.out.println(arrayBlockingQueue.element());


        /**
         * poll() 不抛出异常 会返回null
         */

        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
    }


    /**
     * 等待 阻塞(一直等待)
     */
    public static void arrayBlockQueueWait() throws InterruptedException {
        //创建时需要初始化队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        /**
         * put()
         */
        arrayBlockingQueue.put("李");
        arrayBlockingQueue.put("王");
        arrayBlockingQueue.put("张");

        /**
         * 队列满了以后 再添加 就会一直处于等待状态
         */
//        arrayBlockingQueue.put("赵");

        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());

//        System.out.println(arrayBlockingQueue.take());

    }

    /**
     * 等待 阻塞(等待超时)
     */
    public static void arrayBlockQueueTimeOut() throws InterruptedException {
        //创建时需要初始化队列的大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        /**
         * put()
         */
        arrayBlockingQueue.offer("李");
        arrayBlockingQueue.offer("王");
        arrayBlockingQueue.offer("张");

        /**
         * 队列满了以后 我们可以设置等待时间和单位 这是就会等待两秒钟 超时后退出
         */
        arrayBlockingQueue.offer("赵",2, TimeUnit.SECONDS);

        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());

        //查过2秒钟后 超时后 不再取 输出null
        System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));
    }
}

2.同步队列

SynchronousQueue同步队列:没有容量概念(存进去一个元素,就必须等待这个元素被消费后才能存下一个元素)

示例后续补充

10,线程池

重点:三大方法,七大参数,四种拒绝策略

使用线程池的好处:

​ 降低资源的消耗,提高响应的速度,方便管理,线程可复用,可以控制最大的并发数

1,三大方法

阿里规范开发:

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式能让其他人更加明确线程池的运行规则, 规避资源耗尽的风险、
说明:Executors返回的线程池对象的弊端如下:
1,FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE, 可能会堆积大量的请求,从而导致OOM

​ 2,CacheedThreadPool和ScheduleThreadPool:
允许的创建线程池数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM

package org.ph.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pool
 * @project juc-api
 * @since 2022 -03 -21 21:20
 */
public class ExecutonrsI {
    public static void main(String[] args) {
        //单个线程
//        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //指定大小
//        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //弹性线程
        ExecutorService executorService = Executors.newCachedThreadPool();

        try {
            for (int i = 0; i < 100; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

2,七大参数

首先看下三大方法

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor (0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}	

/**
*	现在可以看出 最终都是使用的ThreadPoolExecutor只不过是入参不同
* 所以开启线程只是调用了 ThreadPoolExecutor 传入不同的参数来应对不同的场景
*/
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
 }

//this		这段代码就是ThreadPoolExecutor的源代码
public ThreadPoolExecutor(int corePoolSize,								   //核心线程数
                              int maximumPoolSize,				   //最大线程数
                              long keepAliveTime,					  //空闲线程存活时间
                              TimeUnit unit,							//超时单位
                              BlockingQueue<Runnable> workQueue,	   //阻塞队列
                              ThreadFactory threadFactory,				 //线程工厂
                              RejectedExecutionHandler handler) {	//线程拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

例子:银行办理业务

​ 平常银行只有几个窗口进行办理业务,不会全部开放(核心线程数) 当有人来办理业务时,就去窗口进行办理,窗口都在办理中的话,随着办理业务的人越来越多,剩余的窗口也需要开放来办理业务(最大线程数)。再来人就会到后候客区等待(此时的候客区就好比是阻塞队列);当再来人的话,此时候客区满员,窗口也在办理中,那么这个人要么在外面等,要么走(拒绝策略)

例子

3,手动创建线程池和四种拒绝策略

四种拒绝策略

1,AbortPolicy

package org.ph.pool;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pool
 * @project juc-api
 * @since 2022 -03 -21 22:45
 */
public class ThreadExecutorPollAbortPolicy {
    public static void main(String[] args) {
        /**
         *         创建线程池
         *  核心线程数 2
         *  最大线程数 5
         *  超时等待时间 5
         *  超时等待时间单位 秒
         *  队列: 候客区,这个地方满了之后就会触发最大线程数
         *  线程工厂 一般默认
         *  拒绝策略    AbortPolicy:默认的:核心线程最大线程队列都满了之后,就不处理再进来的,此时就会抛出异常
         *
         */
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                2,
                5,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        try {
            /**
             * 最大线程池数量=最大线程数+队列数,再大就会执行拒绝策略,此处抛出异常
             *          Task org.ph.pool.ThreadExecutorPollAbortPolicy$$Lambda$1/939047783@2e0fa5d3 rejected from
             */
            for (int i = 0; i < 9; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

2,CallerRunsPolicy

/**
 *         创建线程池
 *  核心线程数 2
 *  最大线程数 5
 *  超时等待时间 5
 *  超时等待时间单位 秒
 *  队列: 候客区,这个地方满了之后就会触发最大线程数
 *  线程工厂 一般默认
 *  拒绝策略    CallerRunsPolicy: 哪里来的哪里执行
 *
 */
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
        2,
        5,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(3),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy());

try {
    /**
     * 最大线程池数量=最大线程数+队列数,再大就会劝退,谁调用的谁执行
     *
     */
    for (int i = 0; i < 10; i++) {
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    executorService.shutdown();
}

3,DiscardPolicy

package org.ph.pool;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pool
 * @project juc-api
 * @since 2022 -03 -21 22:54
 */
public class ThreadExecutorPollDiscardPolicy {
    public static void main(String[] args) {
        /**
         *         创建线程池
         *  核心线程数 2
         *  最大线程数 5
         *  超时等待时间 5
         *  超时等待时间单位 秒
         *  队列: 候客区,这个地方满了之后就会触发最大线程数
         *  线程工厂 一般默认
         *  拒绝策略    DiscardPolicy: 队列满了之后不抛出异常,而是丢弃当前任务
         *
         */
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                2,
                5,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());

        try {
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

4,DiscardOldestPolicy

package org.ph.pool;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.pool
 * @project juc-api
 * @since 2022 -03 -21 23:00
 */
public class ThreadExecutorPollDiscardOldestPolicy {
    public static void main(String[] args) {
        /**
         *         创建线程池
         *  核心线程数 2
         *  最大线程数 5
         *  超时等待时间 5
         *  超时等待时间单位 秒
         *  队列: 候客区,这个地方满了之后就会触发最大线程数
         *  线程工厂 一般默认
         *  拒绝策略    DiscardOldestPolicy: 将任务队列最老的任务丢弃,并尝试再次提交新的任务
         *              队列首部被poll出去,然后再调用execute方法尝试进入队列
         */
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        try {
            for (int i = 0; i < 9; i++) {
                final int temp = i;
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

4,CPU密集型和IO密集型

CPU密集型:设置最大线程是你的CPU核心数,保证CPU效率最高

IO:判断你的程序最好资源的IO线程 两倍

Runtime.getRuntime().availableProcessors()	//最大线程不设定死,让CPU核心数来判断 

11,四大函数式接口

lambda表达式,链式编程,函数式接口,Stream流式计算

1,函数式接口

只有一个方法的接口

@FunctionalInterface
public interface Runnable {
    /**
     * 示例 
     */
    public abstract void run();
}

示例:

package org.ph.lambda;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lambda
 * @project juc-api
 * @since 2022 -03 -22 20:51
 */
public class FunctionI {
    public static void main(String[] args) {
        supplier();
    }

    public static void function() {
        Function function = str -> str;
        System.out.println(function.apply("Dsa"));
    }

    /**
     * 断定型接口
     */
    public static void predicate() {
        Predicate predicate = flg -> (boolean) flg;
        System.out.println("predicate.test(1==1) = " + predicate.test(1 == 2));
    }

    //消费型接口
    public static void consumer() {
        Consumer consumer = System.out::println;
        consumer.accept("卧槽");
    }

    //供给型接口
    public static void supplier() {
        Integer ss = 22;
        Supplier supplier = () -> ss;
        System.out.println(supplier.get());
    }

}

12,Stream流计算

package org.ph.lambda;

import org.omg.CORBA.INTERNAL;
import org.ph.lambda.pojo.User;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.lambda
 * @project juc-api
 * @since 2022 -03 -22 21:17
 */
public class StreamI {
    public static void main(String[] args) {

        User user1 = new User(1, "a小明", 22);
        User user2 = new User(2, "b狗蛋儿", 12);
        User user3 = new User(3, "c铁柱", 28);
        User user4 = new User(4, "d翠花", 23);
        List<User> users = Arrays.asList(user1, user2, user3, user4);

        /**
         *         filter 过滤 年龄是偶数的
         *         map 映射 处理单个字段
         *         sorted 排序 o1和o2比较大小进行排序
         */
        users.stream()
                .filter(u -> u.getAge() % 2 == 0)
                .map(user -> user.getName().toUpperCase())
                .sorted((o1, o2) -> o2.compareTo(o1))
                .limit(2)
                .forEach(System.out::println);
    }

}

13,ForkJoin

并行执行任务,大数据量下提高效率

ForkJoin

就是把大任务拆分成小任务,最后子任务结果向上返,生成最终结果(分治算法)

ForkJoin的特点:工作窃取

​ 例如两条线程,一条执行的比较快,执行完后就会去把另一条线程没有执行完的任务偷过来继续执行。

package org.ph.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.forkjoin
 * @project juc-api
 * @since 2022 -03 -22 22:52
 */
public class ForkJoinAdd extends RecursiveTask<Long> {
    private long start;
    private long end;

    public ForkJoinAdd(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //临界值
    private long middis = 10000;

    @Override
    protected Long compute() {
        if ((end - start) < middis) {
            long sum = 0;
            long i = 1;
            for (i = start; i <=end; i++) {
                sum += i;
            }
//            System.out.println("普通循环计算完毕:" + i + "次,结果计算到:" + sum);
            return sum;
        } else {
            //走分支合并计算
            //中间值
            long middle = (start + end) / 2;
            ForkJoinAdd forkTask1 = new ForkJoinAdd(start, middle);
            forkTask1.fork();
            ForkJoinAdd forkTask2 = new ForkJoinAdd(middle + 1, end);
            forkTask2.fork();
//            System.out.println("fork计算:" + forkTask1.join() + ":" + forkTask2.join());
            return forkTask1.join() + forkTask2.join();
        }
    }
}

测试类

package org.ph.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

/**
 * <p>
 *
 * </p>
 *
 * @author lph
 * @package org.ph.forkjoin
 * @project juc-api
 * @since 2022 -03 -22 23:09
 */
public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        forkJoinAdd();
    }

    public static void forkJoinAdd() throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> forkJoinAdd = new ForkJoinAdd(0L, 10_0000_0000);
        ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinAdd);
        long aLong = submit.get();
        long endTime = System.currentTimeMillis();
        System.out.println("计算时间:" + (endTime - startTime) + "aLong = " + aLong);
    }

    public static void streamLong() {
        long startTime = System.currentTimeMillis();
        long reduce = LongStream.rangeClosed(0L, 10_0000_0000).parallel().reduce(0, Long::sum);
        long endTime = System.currentTimeMillis();
        System.out.println("计算时间:" + (endTime - startTime) + ";aLong = " + reduce);
    }
}