STORM入门之(集成ElasticSearch)

本片文章基于本专题Demo进行 传送门:http://blog.csdn.net/column/details/17004.html

由于Storm集成ES过于陈旧,所以会照成连接ES客户端抛出node不可用异常,ES集群搭建为2.4.1版本 所以无论如何连接都是node不可用,解决方法修改Storm源码。

主要修改源码的连接ES部分,构建集群客户端,修正文件4个 如图:



修改后的源码

NewEsConfig

    TransportAddress[] getTransportAddresses() {
        String[] ns = nodes;
        TransportAddress[] addressArr = new TransportAddress[ns.length];
        for (int i = 0; i < ns.length; i++) {
            try {
                addressArr[i] = new InetSocketTransportAddress(InetAddress.getByName(ns[i]), 9300);
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }

        return addressArr;
    }

    Settings toBasicSettings() {
        return Settings.settingsBuilder()
                .put("cluster.name", clusterName)
                .put("transport.tcp.compress", true)
                .build();
    }

NewStormElasticSearchClient

   public Client construct() {
        Settings settings = esConfig.toBasicSettings();
        TransportClient transportClient = TransportClient.builder().settings(settings).build().addTransportAddresses(esConfig.getTransportAddresses());
        return transportClient;
    }

Topology构建

 /**
     * 构建ElasticBolt
     */
    private static void builtEsIndexBolt(TopologyBuilder builder){
        NewEsConfig esConfig = new NewEsConfig("ES-CLS", new String[]{"10.2.4.15","10.2.4.42","10.2.4.43"});
        EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
        NewEsIndexBolt indexBolt = new NewEsIndexBolt(esConfig, tupleMapper);
        builder.setBolt("es-bolt",indexBolt,1).shuffleGrouping("BoltA");
    }

Bolt发送方式

 public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("source", "index","type","id"));
    }

入库Json

{"id":1,"ide":"eclipse","name":"Java"}

结果


版权声明:本文为yl3395017原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yl3395017/article/details/77496034