java并发包多线程编程,Semaphore,CountDownLatch,CyclicBarrier,Phaser和FutureTask,Callback,Runnable,Thread,Exchanger,ThreadLocal的简单说明及示例

作者: admin 分类: JAVA 发布时间: 2019-06-19 13:03  阅读: 309 views

并发编程在实际工作中应用的并不是很多,但还是需要了解,可以在某些情况下有效的提高代码质量|效率等,也是增加了一种新的实现方案等。


Thread:线程类,由于java单继承的特性,不如runnable容易扩展。
—使用场景:可以把一些不需要返回结果|不影响主线程的业务逻辑,扔给一个新的线程。如,记录业务日志。

 

Runnable:线程接口,可以多实现。
—使用场景:如上。

 

Callable:线程接口,可以抛出异常及返回结果。 配合 Future,FutureTask使用
—使用场景:多线程开发过程中,需要获取执行结果及异常的业务。

 

SemaPhore:并发编程类-信号量,可以用来处理池类概念的业务。
—使用场景:线程池、连接池、网关限流等。 通过一定量的许可,按要求获取,释放,达到控制。

 

CountDownLatch:并发编程类-计数器,用于等待指定数量线程全部执行完之后,统一开始新的业务;或允许一个或多个线程等待其他的线程结束后开始。
—使用场景:倒计时类的业务。如,两个小组进行三轮比赛。每组的所有成员都完成当前任务的情况才能进入下轮任务。

 

CyclicBarrier:并发编程类-屏障。等待所有指定数量的线程达到同步点之后,统一开始执行。两步执行。
—使用场景:有统一起始点的业务。如,所有运动员都来了,才能开始比赛。像截流泄洪一样。

 

Phaser:并发编程类, 像是CyclicBarrier的升级版。可以多步执行。
—使用场景:业务中有多个环节,每个环节节点需要统一执行。如,游泳比赛。1,全部入场;2,开始比赛;3,比赛结束;4,颁奖;5,退场。

 

Exchanger:并发编程-交换类,满足条件后,交换两个线程持有的对象。
—使用场景:两个账户,一个入账,一个出账。当出账的钱没有之后,立马交换两个账户的数据。类似这种。

 

ThreadLocal:本地线程变量,在每个线程内对该变量创建一个副本,在线程任何地方都可以使用,相互之间不影响。但是要考虑内存的占用率
—使用场景:Spring事务管理器、简单动态数据源的实现。

 

Thread:

package com.chl.thread;


/**
 * java线程类 Thread
 * 
 * @author chenhailong
 * @date 2019年6月10日 下午4:51:09 
 */
public class ThreadTest {

  /**
   * @param args
   */
  public static void main(String[] args) {
    T t1 = new T();
    T t2 = new T();
    
    //run()只是一个方法,会在主线程内顺序执行
    t1.start();
    t2.start();
  }

  static class T extends Thread{
    @Override
    public void run() {
      for(int i = 0;i < 10;i++) {
        try {
          Thread.sleep(2 * 1000l);
          System.out.println("这是"+i);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }
  }
}

Runnable:

package com.chl.thread;


/**
 * java线程接口,runnable的实现
 * 
 * @author chenhailong
 * @date 2019年6月10日 下午5:14:34
 */
public class RunnableTest {

  /**
   * @param args
   */
  public static void main(String[] args) {

    R r = new R();
    Thread t1 = new Thread(r);
    Thread t2 = new Thread(r);
    
    t1.start();
    t2.start();
  }

  static class R implements Runnable {
    @Override
    public void run() {
      try {
        for (int i = 0; i < 10; i++) {
          System.out.println("这是++"+i);
          Thread.sleep(2 * 1000l);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

Callable:

package com.chl.thread;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * java线程接口Callable,与runnable不同的是,可以抛出异常并有返回值
 * 配合 FutureTask执行
 * 
 * @author chenhailong
 * @date 2019年6月13日 下午7:07:17
 */
public class CallableTest {

  /**
   * @param args
   */
  public static void main(String[] args) {

    ExecutorService es = Executors.newFixedThreadPool(5);
    String poolStr = "";
    Calls cs = new Calls();
    try {


      // 一、如果直接调用call方法,并没有开启一个新的线程,打印线程id可以看出来
//       System.out.println(Thread.currentThread().getName());
//       cs.call();

      // 二、配合FutureTask运行
//      FutureTask<String> ft = new FutureTask<String>(cs);
//      System.out.println(Thread.currentThread().getId());
//      new Thread(ft).start();
//      ft.get(); //执行结果
      
      //二、2 配合Future运行
      Future<String> f = es.submit(cs);
      System.out.println(f.get());
      
      //三、线程池处理
//      for(int i = 0;i< 10;i++) {
//        pool p = new pool(i+"");
//        poolStr = poolStr +"---"+ es.submit(p).get();
//        System.out.println(poolStr);
//      }
      
    } catch (Exception e) {
      e.printStackTrace();
    }

  }


  static class Calls implements Callable<String> {
    @Override
    public String call() throws Exception {
      int num = new Random().nextInt(3);
      System.out.println(Thread.currentThread().getId());
      if (num % 2 == 0) {
        Integer.parseInt("s");
      }
      return num + "";
    }
  }
  
  //线程池做法示例类
  static class pool implements Callable<String> {
    private String str;
    
    pool(String str){
      this.str = str;
    }
    @Override
    public String call() throws Exception {
      return str;
    }
  }

}

Semaphore:

package com.chl.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 同步关键类,
 * 可以做流量控制、池类处理
 * 
 * @author chenhailong
 * @date 2019年6月6日 上午10:25:40
 */
public class SemaphoreTest {

  /**
   * @param args
   */
  public static void main(String[] args) {
    // 示例一
    // Pool pl = new Pool();
    // pl.putItem("b");

    // 示例二
    FlowController fc = new FlowController();
    fc.test();

  }

  /**
   * jdk源码中的semaphore示例
   * @author chenhailong
   * @date 2019年6月10日 上午10:38:47
   */
  static class Pool {
    private static final int MAX_AVAILABLE = 2;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    protected Object[] items = new Object[] {'a', 'b'};
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    public Object getItem() throws InterruptedException {
      available.acquire(); // 获得许可
      return getNextAvailableItem();
    }

    public void putItem(Object x) {
      System.out.println(available.availablePermits());
      if (markAsUnused(x))
        available.release(); // 释放资源
    }

    protected synchronized Object getNextAvailableItem() {
      for (int i = 0; i < MAX_AVAILABLE; ++i) {
        if (!used[i]) {
          used[i] = true;
          return items[i];
        }
      }
      return null;
    }

    protected synchronized boolean markAsUnused(Object item) {
      for (int i = 0; i < MAX_AVAILABLE; ++i) {
        if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
        }
      }
      return false;
    }
  }


  /**
   * 简单流控处理,每次执行10个线程。
   * @author chenhailong
   * @date 2019年6月10日 上午11:01:04
   */
  static class FlowController {
    final int THREAD_NUMS = 30;
    ExecutorService pool = Executors.newFixedThreadPool(THREAD_NUMS);
    void test() {
      Semaphore sh = new Semaphore(10);
      for (int i = 0; i < 30; i++) {
        final String num = i+"";
        pool.execute(() -> {
          try {
            sh.acquire();
            Thread.sleep(1 * 1000);
            System.out.println("doSomething:"+num+";executeTime:"+System.currentTimeMillis());
            sh.release();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
        });
      }
      pool.shutdown();
    }
  }

}

CountDownLatch:

package com.chl.thread;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 同步辅助类CountDownLatch,允许一个或多个线程等待其他的线程系列操作 结束之后开始执行
 * 
 * @author chenhailong
 * @date 2019年6月10日 上午11:04:30
 */
public class CountDownLatchTest {

  /**
   * @param args
   */

  public static void main(String[] args) {
    //jdk代码中的示例
    new CountDownLatchTest().officalExample();
    
    //简单示例
    new CountDownLatchTest().basicUse();
  }

  /**
   * jdk1.8的文档官方示例
   * 两个计数器,一个执行完之后等待另一个执行。
   */
  void officalExample() {
    int thread = 10;
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(thread);
    for (int i = 0; i < thread; i++) {
      new Thread(new Worker(startSignal, doneSignal)).start();
    }
    System.out.println("startSignal计数减1");
    startSignal.countDown();
    try {
      System.out.println("等待doneSignal的计数减为0!");
      doneSignal.await();
      System.out.println("开始执行其他业务");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
    }
    @Override
    public void run() {
      try {
        startSignal.await();
        long sleepTime = (new Random().nextInt(4)+1) * 1000l;
        Thread.sleep(sleepTime);
        System.out.println("当前线程的执行时间为"+sleepTime+"s");
        doneSignal.countDown();
        System.out.println("计数减1,当前计数为"+doneSignal.getCount());
      } catch (InterruptedException ex) {
      }
    }
  }

  
  
  /**
   * 简单的使用CountDownLatch进行业务逻辑执行
   * 计数器记录两个线程的任务,完成之后,执行其他的任务
   */
  void basicUse() {
    final int countNum = 2;
    System.out.println("计数器初始化数据为:2");
    CountDownLatch cdl = new CountDownLatch(countNum);
    ExecutorService pool = Executors.newFixedThreadPool(30);
    for(int i = 0;i<2;i++) {
      final int turn = i+1;
      pool.execute(() -> {
        try {
          long randomCount = (new Random().nextInt(10)+1) * 1000l;
          System.out.println("第"+turn+"个线程随机执行x秒!"+randomCount);
          Thread.sleep(randomCount);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        cdl.countDown();
        System.out.println("第"+turn+"个线程执行完之后,计数减1");
      }); 
    }
    try {
      System.out.println("等待所有线程执行完毕...");
      cdl.await();
      System.out.println("计数减为0后,业务继续执行");
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
  
  
}

 

CyclicBarrier:

package com.chl.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * 栅栏类-CyclicBarrier 
 * @author chenhailong
 * @date 2019年6月10日 下午4:02:27 
 */
public class CyclicBarrierTest {

  /**
   * @param args
   */
  public static void main(String[] args) {
    
    new CyclicBarrierTest().basicUser();
  }
  
  /**
   * cyclicBarrier的基本使用
   * 10个线程任务全部执行完之后,才继续执行下面的任务
   */
  void basicUser() {
    final int threadNum = 10;
    CyclicBarrier cb = new CyclicBarrier(10);
    ExecutorService pool = Executors.newFixedThreadPool(threadNum);
    for(int i = 0;i<threadNum;i++ ) {
      final int turn = i;
      pool.execute(() -> {
        long randomCount = (new Random().nextInt(10)+1) * 1000l;
        System.out.println("第"+turn+"个线程随机执行"+randomCount+"秒!");
        try {
          Thread.sleep(randomCount);
          cb.await();
          System.out.println("全部执行完毕,所有任务开始继续执行");
        } catch (InterruptedException | BrokenBarrierException e) {
          e.printStackTrace();
        }
      });
    }
    
//    for(;;) {
//      int waiting = cb.getNumberWaiting();
//      try {
//        Thread.sleep(3 * 1000l);
//      } catch (InterruptedException e) {
//        e.printStackTrace();
//      }
//      System.out.println("等待中的数量"+waiting);
//    }
  }
  
  
  /**
   * JDK1.8的源码示例代码说明
   * @author chenhailong
   * @date 2019年6月10日 下午4:49:39
   */
  class Solver{
    final int N;
    final float[][] data;
    final CyclicBarrier barrier;
    
    public Solver(float[][] matrix) {
      data = matrix;
      N = matrix.length;
      Runnable barrierAction = new Runnable() {
        @Override
        public void run() {
          //TODO 
          //do some mergeRows operations
          System.out.println("merge Action!");
        }
      };
      barrier = new CyclicBarrier(N,barrierAction);
      
      List<Thread> threads = new ArrayList<Thread>(N);
      for(int i = 0;i<N;i++) {
        Thread thread = new Thread(new Worker(i));
        threads.add(thread);
        thread.start();
      }
      
      //wait until done
      for(Thread thread: threads) {
        try {
          thread.join();
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }
    
    class Worker implements Runnable{
      int myRow ;
      Worker(int row){
        myRow = row;
      }
      @Override
      public void run() {
        while(true) {
          System.out.println("continue;");
          try {
            barrier.await();
            break;
          } catch (InterruptedException e) {
            e.printStackTrace();
          } catch (BrokenBarrierException e) {
            e.printStackTrace();
          }
        }
        
      }
      
    }
  }
 
}

 

Phaser:

package com.chl.thread;

import java.util.concurrent.Phaser;

/**
 * Java‘阶段器’工具类Phaser,类似于CyclicBarrier的多次实现,功能更多
 * @author chenhailong
 * @date 2019年6月10日 下午5:22:06
 */
public class PhaserTest {

  /**
   * @param args
   */
  public static void main(String[] args) {
    // phaser简单处理
    // basicTest();

    // phaser另一个示例
    example();
  }

  /**
   * 简单实现phaser的功能
   */
  static void basicTest() {
    doPhaser dp = new doPhaser();
    MissionTask[] mt = new MissionTask[5]; // task数组
    for (int count = 0; count < mt.length; count++) {
      mt[count] = new MissionTask(dp);
      dp.register(); // 注册,表示phaser需要维护的线程个数
    }
    //开启线程
    Thread[] thread = new Thread[mt.length];
    for (int i = 0; i < thread.length; i++) {
      thread[i] = new Thread(mt[i], "mission" + i);
      thread[i].start();
    }
    //等待所有线程结束
    for (int i = 0; i < mt.length; i++) {
      try {
        thread[i].join(); // 等待所有任务结束,继续执行后边的逻辑
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    System.out.println("all finished!");
  }

  /**
   * 继承phaser类,并重写onAdvance方法,对Phaser的每一阶段进行处理
   * @author chenhailong
   * @date 2019年6月10日 下午7:43:42
   */
  static class doPhaser extends Phaser {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
      try {
        switch (phase) {
          case 0:
            Thread.sleep(1 * 1000l);
            System.out.println("第一阶段");
            break;
          case 1:
            Thread.sleep(1 * 2000l);
            System.out.println("第二阶段");
            break;
          case 2:
            Thread.sleep(1 * 3000l);
            System.out.println("第三阶段");
            break;
          case 3:
            Thread.sleep(1 * 4000l);
            System.out.println("第四阶段");
            break;
          case 4:
            Thread.sleep(1 * 5000l);
            System.out.println("第五阶段");
            break;
          default:
            System.out.println("nono!");
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
      return super.onAdvance(phase, registeredParties);
    }
  }

  /**
   * 根据业务逻辑将线程执行过程分层
   * @author chenhailong
   * @date 2019年6月10日 下午7:47:54
   */
  static class MissionTask implements Runnable {
    private Phaser pha;

    MissionTask(Phaser pha) {
      this.pha = pha;
    }

    @Override
    public void run() {
      System.out.println("开始入场!");
      pha.arriveAndAwaitAdvance();
      System.out.println("开始选择!");
      pha.arriveAndAwaitAdvance();
      System.out.println("开始交流!");
      pha.arriveAndAwaitAdvance();
      System.out.println("交易结束");
      pha.arriveAndAwaitAdvance();
      System.out.println("游戏结束");
      pha.arriveAndAwaitAdvance();
    }
  }

  /***
   * =========================================== 第二个示例
   */
  static void example() {
    SwimingPhaser sp = new SwimingPhaser();
    for (int i = 0; i < 5; i++) {
      sp.register();
      new Thread(new SwimingTask(sp), "swiming" + i).start();
    }
  }

  static class SwimingTask implements Runnable {
    private Phaser p;

    public SwimingTask(Phaser p) {
      this.p = p;
    }

    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + "选手入场");
      p.arriveAndAwaitAdvance();
      System.out.println(Thread.currentThread().getName() + "开始比赛");
      p.arriveAndAwaitAdvance();
      System.out.println(Thread.currentThread().getName() + "结束比赛");
      p.arriveAndAwaitAdvance();
    }
  }

  static class SwimingPhaser extends Phaser {
    private int phaseToTerminate = 2;//阶段个数

    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
      System.out.println(phase + "----" + registeredParties);
      try {
        Thread.sleep(2 * 1000l);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("第" + phase + "阶段");
//      return super.onAdvance(phase, registeredParties); //效果和上一样
      return phase == phaseToTerminate || registeredParties == 0; // 返回true表示phaser逻辑结束
    }
  }

}

 

Exchanger:

package com.chl.thread;

import java.util.concurrent.Exchanger;

/**
 * java工具类Exchanger,两个工作线程之间交换数据
 * 
 * @author chenhailong
 * @date 2019年6月11日 上午10:39:43
 */
public class ExchangerTest {

  /**
   * @param args
   */
  public static void main(String[] args) {
    Exchanger<Integer> a = new Exchanger<Integer>();
    Fills f = new Fills(a);
    Pulls p = new Pulls(a);
    
    new Thread(f).start();
    new Thread(p).start();
    
  }

  static class Fills implements Runnable {
    private Exchanger<Integer> exchanger;

    Fills(Exchanger<Integer> exchanger) {
      this.exchanger = exchanger;
    }
    int water = 10;
    @Override
    public void run() {
      for (;;) {
        try {
          Thread.sleep(2 * 1000l);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        System.out.println("the current water is " + water);
        if (water == 0) {
          try {
            water = exchanger.exchange(water);
            System.out.println("after change data the water is " + water);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        } else {
          water--;
        }
      }
    }
  }

  static class Pulls implements Runnable {
    private Exchanger<Integer> exchanger;

    Pulls(Exchanger<Integer> exchanger) {
      this.exchanger = exchanger;
    }
    int air = 0;
    @Override
    public void run() {
      for (;;) {
        try {
          Thread.sleep(2 * 1000l);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        System.out.println("the current air is " + air);
        if (air == 10) {
          try {
            air = exchanger.exchange(air);
            System.out.println("after change data the air is " + air);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        } else {
          air++;
        }
      }
    }
  }
}

 

ThreadLocal:

package com.chl.thread;

import java.util.Random;

/**
 * java本地线程变量ThreadLocal,在每个线程内对该变量创建一个副本,在线程任何地方都可以使用,相互之间不影响。但是要考虑内存的占用率
 * 
 * @author chenhailong
 * @date 2019年6月11日 下午1:48:38
 */
public class ThreadLocalTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadLocalTest tt = new ThreadLocalTest();

		new Thread(tt.new test()).start();
		new Thread(tt.new print()).start();
	}

	ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>();

	class test implements Runnable {
		
		@Override
		public void run() {
			try {
				for(int i = 0;i<10;i++) {
					Thread.sleep(500);
					threadLocal.set(i);
					System.out.println("current set-"+i+";thread-1-"+threadLocal.get());
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}			
		}
		
	}

	class print implements Runnable {
		
		@Override
		public void run() {
			try {
				for(int i = 10;i>0;i--) {
					Thread.sleep(1 * 1000);
					threadLocal.set(i);
					System.out.println("current set-"+i+";thread-2-"+threadLocal.get());
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}			
		}
	}
}

 

gitee地址:https://gitee.com/deathearth/Thread


   原创文章,转载请标明本文链接: java并发包多线程编程,Semaphore,CountDownLatch,CyclicBarrier,Phaser和FutureTask,Callback,Runnable,Thread,Exchanger,ThreadLocal的简单说明及示例

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

更多阅读