阅读 284

Java多线程初步【爬虫项目实战】(java多线程编程实战指南)

介绍多线程带来的问题,以及基本解决方案。 竞争条件带来的数据错误问题 死锁的原理、排查与防范 线程安全:同步方法、并发工具包 复制代码

多线程原理回顾

  • 为什么要有多线程,cpu非常快,现代cpu都是多核

package com.github.hcsp.io; public class Crawler {     public static int i = 0;     public static void main(String[] args) {         for (int j = 0; j < 1000; j++) {             new Thread(() -> {                 try {                     Thread.sleep(1);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 System.out.println(++i);             }).start();         }     } } 复制代码

1共享数据的问题.png

竞争条件带来的数据错误

  • 数据错误

    • i++

    • if-then-do

  • 多线程下看到一个东西第一反应,这个东西是不是线程安全

    • 文档里搜索thread,非常经典的问题比如hashmap死锁

    • 案例,原因是多线程无法保证它的原子性

package com.github.hcsp.io; import java.util.HashMap; import java.util.Map; import java.util.Random; public class Main {     public static int i = 0;     private static Map<Integer, Integer> map = new HashMap<>();     public static void main(String[] args) throws InterruptedException {         for (int j = 0; j < 1000; j++) {             new Thread(Main::putIfAbsent).start();         }     }     private static void putIfAbsent()  {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         }         // 随机生成一个1到10之间的数字,如果它不在map中,就把它加入map         int r = new Random().nextInt(10);         if(!map.containsKey(r)) {             map.put(r,r);             System.out.println("Put " + r);         }     } } 复制代码

死锁详解、排查与防范

  • 著名的HashMap的死循环的问题

  • 写一段代码来重现死锁

  • 死锁问题的排查

  • 多线程的经典问题:哲学家用餐

synchronized关键字

  • 说白了就是一把锁

  • 例子

package com.github.hcsp.io; public class Main {     private static final Object lock1 = new Object();     private static final Object lock2 = new Object();     public static void main(String[] args) throws InterruptedException {         new Thread1().start();         new Thread2().start();     }     static class Thread1 extends Thread {         @Override         public void run() {             synchronized (lock1) {                 try {                     Thread.sleep(500);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 synchronized (lock2) {                     System.out.println("1");                 }             }         }     }     static class Thread2 extends Thread {         @Override         public void run() {             synchronized (lock2) {                 try {                     Thread.sleep(100);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 synchronized (lock1) {                     System.out.println("2");                 }             }         }     } } 复制代码

2死锁.png

2hashmap.png

排查死锁的方法

  • 首先要知道进程的id是多少

  • 查看进程,这种方法比较麻烦,直接用java自带的命令,jps

ps aux | grep java 复制代码

  • 找到进程id,91790,jstack导出到文件 jstack > output.txt, 此命令也可以用于排查程序非常慢的情况

  • 一个对象被用作锁的用途,放到synchronized块里面的话,称之为监视器

  • 实际工作中情况可能非常复杂,需要jstack逐一排查

  • 预防死锁产生的原则

    • 所有的线程都按照相同的顺序获得资源的锁

线程安全:同步方法、并发工具包

  • 实现线程安全的基本手段

    • 不可变类,Integer/String/...

    • String S = "abc",S指向的是一个地址,真正的数据abc存在堆里,String的对象是不会变的,但是可以把新的字符串变量指向一个变量

3.png

第二种同步方法

  • 实现线程安全的基本手段

    • synchronized(一个对象),把这个对象当成锁

    • Static synchronized  ,静态方法不和任何对象绑定,jvm会偷偷的锁class对象

    • 实例的synchronized方法把该实例当成锁

    • synchronized同步块

    • 同步块同步了什么东西?

    • Collection.synchronized

4.png

  • 演示同步块1

package com.github.hcsp.io; public class Main {     private static int i = 0;     private static final Object lock = new Object();     public static void main(String[] args) throws InterruptedException {         for (int j = 0; j < 1000; j++) {             new Thread(Main::modifySharedVariable).start();         }     }     private static void modifySharedVariable() {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         }         synchronized (lock) {             i += 1;         }         System.out.println(i);     }      } 复制代码

  • 能否更简单点

package com.github.hcsp.io; public class Main {     private static int i = 0;     private static final Object lock = new Object();     public static void main(String[] args) throws InterruptedException {         for (int j = 0; j < 1000; j++) {             new Thread(Main::modifySharedVariable).start();         }     }     private synchronized static void modifySharedVariable() {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         } //        synchronized (lock) { //            i += 1; //        }         i += 1;         System.out.println(i);     } } 复制代码

  • 实例的synchronized方法把该实例当成锁

5.png

  • 总结:代码加锁synchronized,阻止多人同步操作锁,任何时刻只能有一个人去执行他

  • 相关代码

package com.github.hcsp.io; public class Main {     private int i = 0;     private static final Object lock = new Object();     public static void main(String[] args) throws InterruptedException {         Main Object = new Main();         for (int j = 0; j < 1000; j++) {             new Thread(Object::modifySharedVariable).start();         }     }     private synchronized void modifySharedVariable() {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         }         i += 1;         System.out.println(i);     }     // 功能等价于     private synchronized void modifySharedVariable1() {         synchronized (this) {             try {                 Thread.sleep(1);             } catch (InterruptedException e) {                 e.printStackTrace();             }             i += 1;             System.out.println(i);         }     } } 复制代码

无论面试还是日常工作都非常常用的东西

  • List -> ArrayList

  • Set -> HashSet, TreeSet

  • Map -> HashMap, LinkedHashMap

  • 以上都不是线程安全的,只要不是强调自己是线程安全就不是线程安全的

  • 演示线程不安全

package com.github.hcsp.io; import java.util.HashMap; import java.util.Map; import java.util.Random; public class Main {     private int i = 0;     private static final Object lock = new Object();     private static final Map<Integer, Integer> map = new HashMap<>();     public static void main(String[] args) throws InterruptedException {         for (int j = 0; j < 1000; j++) {             new Thread(Main::concurrentlyAccess).start();         }     }     private static void concurrentlyAccess() {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         }         Integer r = new Random().nextInt();         map.put(r, r);         for(Integer i: map.keySet()) {             System.out.println(i);         }     } } 复制代码

  • 解决办法,用collections.synchronizedMap,

实现线程安全的基本手段

  • JUC包

    • ConcurrentHashMap的使用场景只有一个,任何使用hashMap有线程安全问题的地方 都无脑的使用ConcurrentHashMap替换即可

    • AtomicInteger/...

    • ConcurrentHashMap,java并发工具包

    • ReentrantLock

package com.github.hcsp.io; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; public class Main {     private int i = 0;     private static final Object lock = new Object();     private static final Map<Integer, Integer> map = new ConcurrentHashMap<>();     private static int a = 0;     public static void main(String[] args) throws InterruptedException {         for (int j = 0; j < 1000; j++) {             new Thread(Main::concurrentlyAccess).start();         }     }     private static void concurrentlyAccess() {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         }         Integer r = new Random().nextInt();         map.put(r, r);         for(Integer i: map.keySet()) {             System.out.println(i);         }     } } 复制代码

  • atomicInteger的使用

package com.github.hcsp.io; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class Main {     private int i = 0;     private static final Object lock = new Object();     private static final Map<Integer, Integer> map = new ConcurrentHashMap<>();     private static int a = 0;     private static AtomicInteger b = new AtomicInteger(0);     public static void main(String[] args) throws InterruptedException {         for (int j = 0; j < 1000; j++) {             new Thread(Main::concurrentlyAccess).start();         }     }     private static void concurrentlyAccess() {         try {             Thread.sleep(1);         } catch (InterruptedException e) {             e.printStackTrace();         }         // a+=1操作不是原子的,从内存中取a,+1,把结果写回内存         a += 1; //        System.out.println(a);         // 它是原子的         b.addAndGet(1);         System.out.println(b.get());     } } 复制代码

  • ReentrantLock的使用,可重用锁,之前直接在static前面加synchronized有个问题,

    • 问题就是每一个线程访问的时候拿到锁才能访问,问题就是你不想在一个方法里面做这个事情,

    • 想在这里枷锁,遥远的另一个地方解锁,几乎等价synchronized

线程的状态与Object类中的线程方法

  • 线程的历史

    • java从一开始就把线程作为语言特性,提供语言级的支持

  • 为什么java中的所有对象都可以成为锁

    • Object.wait()/notify()/notifyAll()方法

    • 线程的状态与线程调度

    • 扩展-为什么说java的线程

  • 调用哪个wait的方法,必须持有这个对象的monitor,代表synchronized锁住的那个对象

  • notify是任意挑选一个等待的线程进行唤醒,notifyAll,唤醒所有的等待的监视器的进程,开始竞争锁,只有一个能竞争成功

6.png

用三种方法实现生产者消费者模型

  • 方法1,使用wait,notify

package com.github.hcsp.multithread; import java.util.Optional; import java.util.Random; public class ProducerConsumer1 {     public static void main(String[] args) throws InterruptedException {         Container container = new Container();         Object lock = new Object();         Producer producer = new Producer(container, lock);         Consumer consumer = new Consumer(container, lock);         producer.start();         consumer.start();         producer.join();         producer.join();     }     public static class Producer extends Thread {         Container container;         Object lock;         public Producer(Container container, Object lock) {             this.container = container;             this.lock = lock;         }         @Override         public void run() {             for (int i = 0; i < 10; i++) {                 synchronized (lock) {                     // 判断container存没存对象                     while (container.getValue().isPresent()) {                         try {                             lock.wait();                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                     int r = new Random().nextInt();                     System.out.println("Producing " + r);                     container.setValue(Optional.of(r));                     lock.notify();                 }             }         }     }     public static class Consumer extends Thread {         Container container;         Object lock;         public Consumer(Container container, Object lock) {             this.container = container;             this.lock = lock;         }         @Override         public void run() {             for (int i = 0; i < 10; i++) {                 synchronized (lock) {                     // 判断container存没存对象                     while (!container.getValue().isPresent()) {                         try {                             lock.wait();                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                     Integer value = container.getValue().get();                     container.setValue(Optional.empty());                     System.out.println("Consuming " + value);                     lock.notify();                 }             }         }     }     public static class Container {         public Optional<Integer> getValue() {             return value;         }         public void setValue(Optional<Integer> value) {             this.value = value;         }         Optional<Integer> value = Optional.empty();     } } 复制代码

  • 方法二,lock和condition

package com.github.hcsp.multithread; import java.util.Optional; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumer2 {     public static void main(String[] args) throws InterruptedException {         ReentrantLock lock = new ReentrantLock();         Container container = new Container(lock);         Producer producer = new Producer(container, lock);         Consumer consumer = new Consumer(container, lock);         producer.start();         consumer.start();         producer.join();         producer.join();     }     public static class Producer extends Thread {         Container container;         ReentrantLock lock;         public Producer(Container container, ReentrantLock lock) {             this.container = container;             this.lock = lock;         }         @Override         public void run() {             for (int i = 0; i < 10; i++) {                 try {                     lock.lock();                     // 判断container存没存对象                     while (container.getValue().isPresent()) {                         try {                             container.notConsumedYet.await();                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                     int r = new Random().nextInt();                     System.out.println("Producing " + r);                     container.setValue(Optional.of(r));                     container.getNotProducedYet().signal();                 } finally {                     lock.unlock();                 }             }         }     }     public static class Consumer extends Thread {         Container container;         ReentrantLock lock;         public Consumer(Container container, ReentrantLock lock) {             this.container = container;             this.lock = lock;         }         @Override         public void run() {             for (int i = 0; i < 10; i++) {                 try {                     lock.lock();                     // 判断container存没存对象                     while (!container.getValue().isPresent()) {                         try {                             container.getNotProducedYet().await();                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                     Integer value = container.getValue().get();                     container.setValue(Optional.empty());                     System.out.println("Consuming " + value);                     container.getNotConsumedYet().signal();                 } finally {                     lock.unlock();                 }             }         }     }     public static class Container {         private Condition notConsumedYet; // 还没有被消费掉         private Condition notProducedYet; // 还没有被生产出来         public Container(ReentrantLock lock) {             this.notConsumedYet = lock.newCondition();             this.notProducedYet = lock.newCondition();         }         public Condition getNotConsumedYet() {             return notConsumedYet;         }         public Condition getNotProducedYet() {             return notProducedYet;         }         private Optional<Integer> value = Optional.empty();         public Optional<Integer> getValue() {             return value;         }         public void setValue(Optional<Integer> value) {             this.value = value;         }     } } 复制代码

  • 方法三,blocking queue,blocking queue是一个接口

package com.github.hcsp.multithread; import java.util.Optional; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumer3 {     public static void main(String[] args) throws InterruptedException {         BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);         BlockingQueue<Integer> signalQueue = new LinkedBlockingQueue<>(1);         Producer producer = new Producer(queue, signalQueue);         Consumer consumer = new Consumer(queue, signalQueue);         producer.start();         consumer.start();         producer.join();         producer.join();     }     public static class Producer extends Thread {         BlockingQueue<Integer> queue;         public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {             this.queue = queue;             this.signalQueue = signalQueue;         }         BlockingQueue<Integer> signalQueue;         @Override         public void run() {             for (int i = 0; i < 10; i++) {                 int r = new Random().nextInt();                 System.out.println("Producing " + r);                 try {                     queue.put(r);                     signalQueue.take();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         }     }     public static class Consumer extends Thread {         BlockingQueue<Integer> queue;         public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {             this.queue = queue;             this.signalQueue = signalQueue;         }         BlockingQueue<Integer> signalQueue;         @Override         public void run() {             for (int i = 0; i < 10; i++) {                 try {                     System.out.println("Consuming " + queue.take());                     // 随便放一个                     signalQueue.put(0);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 ;             }         }     }     public static class Container {         private Condition notConsumedYet; // 还没有被消费掉         private Condition notProducedYet; // 还没有被生产出来         public Container(ReentrantLock lock) {             this.notConsumedYet = lock.newCondition();             this.notProducedYet = lock.newCondition();         }         public Condition getNotConsumedYet() {             return notConsumedYet;         }         public Condition getNotProducedYet() {             return notProducedYet;         }         private Optional<Integer> value = Optional.empty();         public Optional<Integer> getValue() {             return value;         }         public void setValue(Optional<Integer> value) {             this.value = value;         }     } } 复制代码

线程池和callable和Future

  • 什么是线程池

    • 线程是昂贵的(Java线程模型的缺陷)

    • 线程池是预先定义好的若干个线程

    • Java中的线程池

  • Callable/Future

    • 类比Runnable没有返回值,不能抛出异常必须吞掉异常, Callable可以返回值,抛出异常

    • Future代表一个"未来才会返回的结果"

  • 线程池案例,

package com.github.hcsp.multithread; import java.io.File; import java.util.List; import java.util.Map; import java.util.concurrent.*; public class MultiThreadWordCount1 {     public MultiThreadWordCount1(int threadNum) {         ExecutorService threadPool = Executors.newFixedThreadPool(threadNum);     }     public static void main(String[] args) throws ExecutionException, InterruptedException {         ExecutorService threadPool = Executors.newFixedThreadPool(10);         // 未来能         Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {             @Override             public Integer call() throws Exception {                 Thread.sleep(1000);                 return 0;             }         });         Future<String> future2 = threadPool.submit(new Callable<String>() {             @Override             public String call() throws Exception {                 Thread.sleep(1000);                 return "abc";             }         });         Future<Object> future3 = threadPool.submit(new Callable<Object>() {             @Override             public String call() throws Exception {                 throw new RuntimeException();             }         });         // 调用get方法的时候如果future里面的值已经完成了,就会立刻返回,否则就会等待         System.out.println(future1.get());         System.out.println(future2.get());         System.out.println(future3.get());     }     // 使用threadNum个线程,并发统计文件中各单词的数量     public static Map<String, Integer> count(int threadNum, List<File> files) {         return null;     } } 复制代码

  • 实战: 多线程的WordCount

课后练习题

  • 练习题1

    • synchronized关键字

package com.github.hcsp.multithread; public class Counter {     private int value = 0;     private static final Object lock1 = new Object();     public int getValue() {         return value;     }     // 加上一个整数i,并返回加之后的结果     public int addAndGet(int i) {         synchronized (lock1) {             value += i;             return value;         }     }     // 减去一个整数i,并返回减之后的结果     public int minusAndGet(int i) {         synchronized (lock1) {             value -= i;             return value;         }     } } 复制代码

  • 方法二,ReentrantLock

package com.github.hcsp.multithread; import java.util.concurrent.locks.ReentrantLock; public class Counter {     private int value = 0;     private ReentrantLock lock = new ReentrantLock();     public int getValue() {         return value;     }     // 加上一个整数i,并返回加之后的结果     public int addAndGet(int i) {         try {             lock.lock();             System.out.println(value);             value += i;             return value;         } finally {             lock.unlock();         }     }     // 减去一个整数i,并返回减之后的结果     public int minusAndGet(int i) {         try {             lock.lock();             value -= i;             return value;         } finally {             lock.unlock();         }     } } 复制代码

  • 方法三,AtomicInteger的方法

package com.github.hcsp.multithread; import java.util.concurrent.atomic.AtomicInteger; public class Counter {     private static int value = 0;     private static AtomicInteger a = new AtomicInteger(value);     private static AtomicInteger b = new AtomicInteger(value);     public int getValue() {         return value;     }     // 加上一个整数i,并返回加之后的结果     public int addAndGet(int i) {         a.addAndGet(i);         return value;     }     // 减去一个整数i,并返回减之后的结果     public int minusAndGet(int i) {         b.addAndGet(-i);         return value;     } }


作者:snakeshe1010
链接:https://juejin.cn/post/7032172490613129229


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐