并行程序设计模式-master-worker模式

master-worker核心思想

这里写图片描述

master-worker模式优点

这里写图片描述

master-worker模式结构

这里写图片描述

master-worker模式主要参与者

这里写图片描述

master-worker样例代码结构

这里写图片描述

代码

1.master.java

package test;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {


    //任务队列
    protected  Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();

    //worker进程队列
    protected Map<String,Thread>threadMap = new HashMap<String,Thread>();


    //子任务结果处理集
    protected Map<String,Object>resultMap = new ConcurrentHashMap<String,Object>();


    public Map<String, Object> getResultMap() {
        return resultMap;
    }


    public Master(Worker worker ,int countWorker){
        worker.setWorkQueue(workQueue);
        worker.setResultMap(resultMap);
        for(int i=0;i<countWorker;i++){
            threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
        }
    }

    public void submit(Object job){
        workQueue.add(job);
    }

    public void execute(){
        for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
            entry.getValue().start();
        }
    }

    public boolean isComplete(){
        for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
            if(entry.getValue().getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }


}

2.Worker.java

package test;

import java.util.Map;
import java.util.Queue;

public class Worker implements Runnable {

    // 任务队列,用于取得子任务
    protected Queue<Object> workQueue;

    // 子任务结果处理集
    protected Map<String, Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    public Object handle(Object input) {
        return input;
    }

    @Override
    public void run() {

        while (true) {
            Object input = workQueue.poll();
            if( input == null){
                break;
            }
            //处理子任务
            Object re = handle(input);
            int k =input.hashCode();
            //将处理结果写入结果集
            resultMap.put(Integer.toString(input.hashCode()), re);
        }
    }

}

3.PlusWorker

package test;

public class PlusWorker extends Worker{


    public Object handle(Object input){
        Integer i =(Integer)input;
        return i*i*i;
    }
}

4.TestMain

package test;

import java.util.Map;
import java.util.Set;

public class TestMain {

    public static void main(String[] args) {

        Master m = new Master(new PlusWorker(), 5);
        for (int i = 0; i < 100; i++) {
            m.submit(i);
        }
        m.execute();
        int re = 0;
        Map<String, Object> resultMap = m.getResultMap();
        while (resultMap.size() > 0 || !m.isComplete()) {
            Set<String> keys = resultMap.keySet();
            String key = null;
            for (String k : keys) {
                key = k;
                break;
            }
            Integer i = null;
            if (key != null) {
                i = (Integer) resultMap.get(key);
            }
            if (i != null) {
                re = re + i;
            }
            if (key != null) {
                System.out.println("removekey:"+key);
                resultMap.remove(key);
            }
        }
        System.out.println("re" + re);
    }

}
版权声明:本文为lvyuan30276原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lvyuan30276/article/details/78894699