Java多线程初步【爬虫项目实战】(java多线程编程实战指南)
介绍多线程带来的问题,以及基本解决方案。 竞争条件带来的数据错误问题 死锁的原理、排查与防范 线程安全:同步方法、并发工具包 复制代码
多线程原理回顾
为什么要有多线程,cpu非常快,现代cpu都是多核
package com.github.hcsp.io; public class Crawler { public static int i = 0; public static void main(String[] args) { for (int j = 0; j < 1000; j++) { new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(++i); }).start(); } } } 复制代码
竞争条件带来的数据错误
数据错误
i++
if-then-do
多线程下看到一个东西第一反应,这个东西是不是线程安全
文档里搜索thread,非常经典的问题比如hashmap死锁
案例,原因是多线程无法保证它的原子性
package com.github.hcsp.io; import java.util.HashMap; import java.util.Map; import java.util.Random; public class Main { public static int i = 0; private static Map<Integer, Integer> map = new HashMap<>(); public static void main(String[] args) throws InterruptedException { for (int j = 0; j < 1000; j++) { new Thread(Main::putIfAbsent).start(); } } private static void putIfAbsent() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 随机生成一个1到10之间的数字,如果它不在map中,就把它加入map int r = new Random().nextInt(10); if(!map.containsKey(r)) { map.put(r,r); System.out.println("Put " + r); } } } 复制代码
死锁详解、排查与防范
著名的HashMap的死循环的问题
写一段代码来重现死锁
死锁问题的排查
多线程的经典问题:哲学家用餐
synchronized关键字
说白了就是一把锁
例子
package com.github.hcsp.io; public class Main { private static final Object lock1 = new Object(); private static final Object lock2 = new Object(); public static void main(String[] args) throws InterruptedException { new Thread1().start(); new Thread2().start(); } static class Thread1 extends Thread { @Override public void run() { synchronized (lock1) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2) { System.out.println("1"); } } } } static class Thread2 extends Thread { @Override public void run() { synchronized (lock2) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock1) { System.out.println("2"); } } } } } 复制代码
排查死锁的方法
首先要知道进程的id是多少
查看进程,这种方法比较麻烦,直接用java自带的命令,jps
ps aux | grep java 复制代码
找到进程id,91790,jstack导出到文件
jstack > output.txt
, 此命令也可以用于排查程序非常慢的情况一个对象被用作锁的用途,放到synchronized块里面的话,称之为监视器
实际工作中情况可能非常复杂,需要jstack逐一排查
预防死锁产生的原则
所有的线程都按照相同的顺序获得资源的锁
线程安全:同步方法、并发工具包
实现线程安全的基本手段
不可变类,Integer/String/...
String S = "abc"
,S指向的是一个地址,真正的数据abc存在堆里,String的对象是不会变的,但是可以把新的字符串变量指向一个变量
第二种同步方法
实现线程安全的基本手段
synchronized(一个对象),把这个对象当成锁
Static synchronized ,静态方法不和任何对象绑定,jvm会偷偷的锁class对象
实例的synchronized方法把该实例当成锁
synchronized同步块
同步块同步了什么东西?
Collection.synchronized
演示同步块1
package com.github.hcsp.io; public class Main { private static int i = 0; private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { for (int j = 0; j < 1000; j++) { new Thread(Main::modifySharedVariable).start(); } } private static void modifySharedVariable() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock) { i += 1; } System.out.println(i); } } 复制代码
能否更简单点
package com.github.hcsp.io; public class Main { private static int i = 0; private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { for (int j = 0; j < 1000; j++) { new Thread(Main::modifySharedVariable).start(); } } private synchronized static void modifySharedVariable() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // synchronized (lock) { // i += 1; // } i += 1; System.out.println(i); } } 复制代码
实例的synchronized方法把该实例当成锁
总结:代码加锁synchronized,阻止多人同步操作锁,任何时刻只能有一个人去执行他
相关代码
package com.github.hcsp.io; public class Main { private int i = 0; private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Main Object = new Main(); for (int j = 0; j < 1000; j++) { new Thread(Object::modifySharedVariable).start(); } } private synchronized void modifySharedVariable() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } i += 1; System.out.println(i); } // 功能等价于 private synchronized void modifySharedVariable1() { synchronized (this) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } i += 1; System.out.println(i); } } } 复制代码
无论面试还是日常工作都非常常用的东西
List -> ArrayList
Set -> HashSet, TreeSet
Map -> HashMap, LinkedHashMap
以上都不是线程安全的,只要不是强调自己是线程安全就不是线程安全的
演示线程不安全
package com.github.hcsp.io; import java.util.HashMap; import java.util.Map; import java.util.Random; public class Main { private int i = 0; private static final Object lock = new Object(); private static final Map<Integer, Integer> map = new HashMap<>(); public static void main(String[] args) throws InterruptedException { for (int j = 0; j < 1000; j++) { new Thread(Main::concurrentlyAccess).start(); } } private static void concurrentlyAccess() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } Integer r = new Random().nextInt(); map.put(r, r); for(Integer i: map.keySet()) { System.out.println(i); } } } 复制代码
解决办法,用collections.synchronizedMap,
实现线程安全的基本手段
JUC包
ConcurrentHashMap的使用场景只有一个,任何使用hashMap有线程安全问题的地方 都无脑的使用ConcurrentHashMap替换即可
AtomicInteger/...
ConcurrentHashMap,java并发工具包
ReentrantLock
package com.github.hcsp.io; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; public class Main { private int i = 0; private static final Object lock = new Object(); private static final Map<Integer, Integer> map = new ConcurrentHashMap<>(); private static int a = 0; public static void main(String[] args) throws InterruptedException { for (int j = 0; j < 1000; j++) { new Thread(Main::concurrentlyAccess).start(); } } private static void concurrentlyAccess() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } Integer r = new Random().nextInt(); map.put(r, r); for(Integer i: map.keySet()) { System.out.println(i); } } } 复制代码
atomicInteger的使用
package com.github.hcsp.io; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class Main { private int i = 0; private static final Object lock = new Object(); private static final Map<Integer, Integer> map = new ConcurrentHashMap<>(); private static int a = 0; private static AtomicInteger b = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { for (int j = 0; j < 1000; j++) { new Thread(Main::concurrentlyAccess).start(); } } private static void concurrentlyAccess() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // a+=1操作不是原子的,从内存中取a,+1,把结果写回内存 a += 1; // System.out.println(a); // 它是原子的 b.addAndGet(1); System.out.println(b.get()); } } 复制代码
ReentrantLock的使用,可重用锁,之前直接在static前面加synchronized有个问题,
问题就是每一个线程访问的时候拿到锁才能访问,问题就是你不想在一个方法里面做这个事情,
想在这里枷锁,遥远的另一个地方解锁,几乎等价synchronized
线程的状态与Object类中的线程方法
线程的历史
java从一开始就把线程作为语言特性,提供语言级的支持
为什么java中的所有对象都可以成为锁
Object.wait()/notify()/notifyAll()方法
线程的状态与线程调度
扩展-为什么说java的线程
调用哪个wait的方法,必须持有这个对象的monitor,代表synchronized锁住的那个对象
notify是任意挑选一个等待的线程进行唤醒,notifyAll,唤醒所有的等待的监视器的进程,开始竞争锁,只有一个能竞争成功
用三种方法实现生产者消费者模型
方法1,使用wait,notify
package com.github.hcsp.multithread; import java.util.Optional; import java.util.Random; public class ProducerConsumer1 { public static void main(String[] args) throws InterruptedException { Container container = new Container(); Object lock = new Object(); Producer producer = new Producer(container, lock); Consumer consumer = new Consumer(container, lock); producer.start(); consumer.start(); producer.join(); producer.join(); } public static class Producer extends Thread { Container container; Object lock; public Producer(Container container, Object lock) { this.container = container; this.lock = lock; } @Override public void run() { for (int i = 0; i < 10; i++) { synchronized (lock) { // 判断container存没存对象 while (container.getValue().isPresent()) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int r = new Random().nextInt(); System.out.println("Producing " + r); container.setValue(Optional.of(r)); lock.notify(); } } } } public static class Consumer extends Thread { Container container; Object lock; public Consumer(Container container, Object lock) { this.container = container; this.lock = lock; } @Override public void run() { for (int i = 0; i < 10; i++) { synchronized (lock) { // 判断container存没存对象 while (!container.getValue().isPresent()) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Integer value = container.getValue().get(); container.setValue(Optional.empty()); System.out.println("Consuming " + value); lock.notify(); } } } } public static class Container { public Optional<Integer> getValue() { return value; } public void setValue(Optional<Integer> value) { this.value = value; } Optional<Integer> value = Optional.empty(); } } 复制代码
方法二,lock和condition
package com.github.hcsp.multithread; import java.util.Optional; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumer2 { public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); Container container = new Container(lock); Producer producer = new Producer(container, lock); Consumer consumer = new Consumer(container, lock); producer.start(); consumer.start(); producer.join(); producer.join(); } public static class Producer extends Thread { Container container; ReentrantLock lock; public Producer(Container container, ReentrantLock lock) { this.container = container; this.lock = lock; } @Override public void run() { for (int i = 0; i < 10; i++) { try { lock.lock(); // 判断container存没存对象 while (container.getValue().isPresent()) { try { container.notConsumedYet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } int r = new Random().nextInt(); System.out.println("Producing " + r); container.setValue(Optional.of(r)); container.getNotProducedYet().signal(); } finally { lock.unlock(); } } } } public static class Consumer extends Thread { Container container; ReentrantLock lock; public Consumer(Container container, ReentrantLock lock) { this.container = container; this.lock = lock; } @Override public void run() { for (int i = 0; i < 10; i++) { try { lock.lock(); // 判断container存没存对象 while (!container.getValue().isPresent()) { try { container.getNotProducedYet().await(); } catch (InterruptedException e) { e.printStackTrace(); } } Integer value = container.getValue().get(); container.setValue(Optional.empty()); System.out.println("Consuming " + value); container.getNotConsumedYet().signal(); } finally { lock.unlock(); } } } } public static class Container { private Condition notConsumedYet; // 还没有被消费掉 private Condition notProducedYet; // 还没有被生产出来 public Container(ReentrantLock lock) { this.notConsumedYet = lock.newCondition(); this.notProducedYet = lock.newCondition(); } public Condition getNotConsumedYet() { return notConsumedYet; } public Condition getNotProducedYet() { return notProducedYet; } private Optional<Integer> value = Optional.empty(); public Optional<Integer> getValue() { return value; } public void setValue(Optional<Integer> value) { this.value = value; } } } 复制代码
方法三,blocking queue,blocking queue是一个接口
package com.github.hcsp.multithread; import java.util.Optional; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumer3 { public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1); BlockingQueue<Integer> signalQueue = new LinkedBlockingQueue<>(1); Producer producer = new Producer(queue, signalQueue); Consumer consumer = new Consumer(queue, signalQueue); producer.start(); consumer.start(); producer.join(); producer.join(); } public static class Producer extends Thread { BlockingQueue<Integer> queue; public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) { this.queue = queue; this.signalQueue = signalQueue; } BlockingQueue<Integer> signalQueue; @Override public void run() { for (int i = 0; i < 10; i++) { int r = new Random().nextInt(); System.out.println("Producing " + r); try { queue.put(r); signalQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static class Consumer extends Thread { BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) { this.queue = queue; this.signalQueue = signalQueue; } BlockingQueue<Integer> signalQueue; @Override public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("Consuming " + queue.take()); // 随便放一个 signalQueue.put(0); } catch (InterruptedException e) { e.printStackTrace(); } ; } } } public static class Container { private Condition notConsumedYet; // 还没有被消费掉 private Condition notProducedYet; // 还没有被生产出来 public Container(ReentrantLock lock) { this.notConsumedYet = lock.newCondition(); this.notProducedYet = lock.newCondition(); } public Condition getNotConsumedYet() { return notConsumedYet; } public Condition getNotProducedYet() { return notProducedYet; } private Optional<Integer> value = Optional.empty(); public Optional<Integer> getValue() { return value; } public void setValue(Optional<Integer> value) { this.value = value; } } } 复制代码
线程池和callable和Future
什么是线程池
线程是昂贵的(Java线程模型的缺陷)
线程池是预先定义好的若干个线程
Java中的线程池
Callable/Future
类比Runnable没有返回值,不能抛出异常必须吞掉异常, Callable可以返回值,抛出异常
Future代表一个"未来才会返回的结果"
线程池案例,
package com.github.hcsp.multithread; import java.io.File; import java.util.List; import java.util.Map; import java.util.concurrent.*; public class MultiThreadWordCount1 { public MultiThreadWordCount1(int threadNum) { ExecutorService threadPool = Executors.newFixedThreadPool(threadNum); } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(10); // 未来能 Future<Integer> future1 = threadPool.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { Thread.sleep(1000); return 0; } }); Future<String> future2 = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1000); return "abc"; } }); Future<Object> future3 = threadPool.submit(new Callable<Object>() { @Override public String call() throws Exception { throw new RuntimeException(); } }); // 调用get方法的时候如果future里面的值已经完成了,就会立刻返回,否则就会等待 System.out.println(future1.get()); System.out.println(future2.get()); System.out.println(future3.get()); } // 使用threadNum个线程,并发统计文件中各单词的数量 public static Map<String, Integer> count(int threadNum, List<File> files) { return null; } } 复制代码
实战: 多线程的WordCount
课后练习题
练习题1
synchronized关键字
package com.github.hcsp.multithread; public class Counter { private int value = 0; private static final Object lock1 = new Object(); public int getValue() { return value; } // 加上一个整数i,并返回加之后的结果 public int addAndGet(int i) { synchronized (lock1) { value += i; return value; } } // 减去一个整数i,并返回减之后的结果 public int minusAndGet(int i) { synchronized (lock1) { value -= i; return value; } } } 复制代码
方法二,ReentrantLock
package com.github.hcsp.multithread; import java.util.concurrent.locks.ReentrantLock; public class Counter { private int value = 0; private ReentrantLock lock = new ReentrantLock(); public int getValue() { return value; } // 加上一个整数i,并返回加之后的结果 public int addAndGet(int i) { try { lock.lock(); System.out.println(value); value += i; return value; } finally { lock.unlock(); } } // 减去一个整数i,并返回减之后的结果 public int minusAndGet(int i) { try { lock.lock(); value -= i; return value; } finally { lock.unlock(); } } } 复制代码
方法三,AtomicInteger的方法
package com.github.hcsp.multithread; import java.util.concurrent.atomic.AtomicInteger; public class Counter { private static int value = 0; private static AtomicInteger a = new AtomicInteger(value); private static AtomicInteger b = new AtomicInteger(value); public int getValue() { return value; } // 加上一个整数i,并返回加之后的结果 public int addAndGet(int i) { a.addAndGet(i); return value; } // 减去一个整数i,并返回减之后的结果 public int minusAndGet(int i) { b.addAndGet(-i); return value; } }
作者:snakeshe1010
链接:https://juejin.cn/post/7032172490613129229