博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm【开发实战】- 流方式的统计系统
阅读量:6611 次
发布时间:2019-06-24

本文共 12166 字,大约阅读时间需要 40 分钟。

hot3.png

1: 初期硬件准备:

                1 如果条件具备:请保证您安装好了 redis集群

                2 配置好您的Storm开发环境

                3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间

2:业务背景的介绍:

                1  在这里我们将模拟一个   流方式的数据处理过程

                 2 数据的源头保存在我们的redis 集群之中

                 3  发射的数据格式为: ip,url,client_key

数据发射器

package storm.spout;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Values;import backtype.storm.tuple.Fields;import org.json.simple.JSONObject;import org.json.simple.JSONValue;import redis.clients.jedis.Jedis;import storm.utils.Conf;import java.util.Map;import org.apache.log4j.Logger;/** * click Spout 从redis中间读取所需要的数据 */public class ClickSpout extends BaseRichSpout {	private static final long serialVersionUID = -6200450568987812474L;	public static Logger LOG = Logger.getLogger(ClickSpout.class);	// 对于redis,我们使用的是jedis客户端	private Jedis jedis;	// 主机	private String host;	// 端口	private int port;	// Spout 收集器	private SpoutOutputCollector collector;	@Override	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {		        // 这里,我们发射的格式为	        // IP,URL,CLIENT_KEY		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,				storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));	}	@Override	public void open(Map conf, TopologyContext topologyContext,			SpoutOutputCollector spoutOutputCollector) {		host = conf.get(Conf.REDIS_HOST_KEY).toString();		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());		this.collector = spoutOutputCollector;		connectToRedis();	}	private void connectToRedis() {		jedis = new Jedis(host, port);	}	@Override	public void nextTuple() {		String content = jedis.rpop("count");		if (content == null || "nil".equals(content)) {			try {				Thread.sleep(300);			} catch (InterruptedException e) {			}		} else {			// 将jedis对象 rpop出来的字符串解析为 json对象			JSONObject obj = (JSONObject) JSONValue.parse(content);			String ip = obj.get(storm.cookbook.Fields.IP).toString();			String url = obj.get(storm.cookbook.Fields.URL).toString();			String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)					.toString();			System.out.println("this is a clientKey");			// List tuple对象			collector.emit(new Values(ip, url, clientKey));		}	}}

在这个过程之中,请注意:

1  我们在 OPEN 方法之中初始化   host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库

2 我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个

Values对象

让我们来看看数据的流向图:

152941_JVSx_1791874.png

        

在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt

                                    1  : repeatVisitBolt 

                                    2   :  geographyBolt 

共同来读取同一个数据源的数据:clickSpout

3 细细察看 repeatVisitBolt

package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;import storm.utils.Conf;import java.util.Map;public class RepeatVisitBolt extends BaseRichBolt {	private OutputCollector collector;	private Jedis jedis;	private String host;	private int port;	@Override	public void prepare(Map conf, TopologyContext topologyContext,			OutputCollector outputCollector) {		this.collector = outputCollector;		host = conf.get(Conf.REDIS_HOST_KEY).toString();		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());		connectToRedis();	}	private void connectToRedis() {		jedis = new Jedis(host, port);		jedis.connect();	}	public boolean isConnected() {		if (jedis == null)			return false;		return jedis.isConnected();	}	@Override	public void execute(Tuple tuple) {		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);		String clientKey = tuple				.getStringByField(storm.cookbook.Fields.CLIENT_KEY);		String url = tuple.getStringByField(storm.cookbook.Fields.URL);		String key = url + ":" + clientKey;		String value = jedis.get(key);				// redis中取,如果redis中没有,就插入新的一条访问记录。		if (value == null) {			jedis.set(key, "visited");			collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));		} else {			collector					.emit(new Values(clientKey, url, Boolean.FALSE.toString()));		}	}	@Override	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(				storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,				storm.cookbook.Fields.UNIQUE));	}}

  在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】

4:

package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class VisitStatsBolt extends BaseRichBolt {    private OutputCollector collector;    private int total = 0;    private int uniqueCount = 0;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;    }    @Override    public void execute(Tuple tuple) {    	    	//在这里,我们在上游来判断这个Fields 是否是独特和唯一的        boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));                total++;        if(unique)uniqueCount++;        collector.emit(new Values(total,uniqueCount));    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,        		storm.cookbook.Fields.TOTAL_UNIQUE));    }}

第一次出现,uv ++ 

5  接下来,看看流水线2 :

package storm.bolt;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import org.json.simple.JSONObject;import storm.cookbook.IPResolver;import java.util.HashMap;import java.util.List;import java.util.Map;/** * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use * File | Settings | File Templates. */public class GeographyBolt extends BaseRichBolt {	// ip解析器	private IPResolver resolver;	private OutputCollector collector;	public GeographyBolt(IPResolver resolver) {		this.resolver = resolver;	}	@Override	public void prepare(Map map, TopologyContext topologyContext,			OutputCollector outputCollector) {		this.collector = outputCollector;	}	@Override	public void execute(Tuple tuple) {		// 1 从上级的目录之中拿到我们所要使用的ip		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);		// 将ip 转换为json		JSONObject json = resolver.resolveIP(ip);		// 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象		String city = (String) json.get(storm.cookbook.Fields.CITY);		String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);		collector.emit(new Values(country, city));	}	@Override	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {		// 确定了我们这次输出元祖的格式		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,				storm.cookbook.Fields.CITY));	}}

以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换

package storm.bolt;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map;public class GeoStatsBolt extends BaseRichBolt {	private class CountryStats {		//		private int countryTotal = 0;		private static final int COUNT_INDEX = 0;		private static final int PERCENTAGE_INDEX = 1;		private String countryName;		public CountryStats(String countryName) {			this.countryName = countryName;		}		private Map
> cityStats = new HashMap
>(); /**  * @param cityName  */ public void cityFound(String cityName) { countryTotal++; // 已经有了值,一个加1的操作 if (cityStats.containsKey(cityName)) { cityStats.get(cityName) .set(COUNT_INDEX, cityStats.get(cityName).get(COUNT_INDEX) .intValue() + 1); // 没有值的时候 } else { List
 list = new LinkedList
(); list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double) cityStats.get(cityName).get(COUNT_INDEX) / (double) countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent); } /**  * @return 拿到的国家总数  */ public int getCountryTotal() { return countryTotal; } /**  * @param cityName  依据传入的城市名称,拿到城市总数  * @return  */ public int getCityTotal(String cityName) { return cityStats.get(cityName).get(COUNT_INDEX).intValue(); } public String toString() { return "Total Count for " + countryName + " is " + Integer.toString(countryTotal) + "\n" + "Cities:  " + cityStats.toString(); } } private OutputCollector collector; // CountryStats 是一个内部类的对象 private Map
 stats = new HashMap
(); @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY); String city = tuple.getStringByField(storm.cookbook.Fields.CITY); // 如果国家不存在的时候,新增加一个国家,国家的统计 if (!stats.containsKey(country)) { stats.put(country, new CountryStats(country)); } // 这里拿到新的统计,cityFound 是拿到某个城市的值 stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country) .getCityTotal(city))); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields( storm.cookbook.Fields.COUNTRY, storm.cookbook.Fields.COUNTRY_TOTAL, storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL)); }}

有关地理位置的统计,附带上程序其他的使用类

package storm.cookbook;/** */public class Fields {	public static final String IP = "ip";		public static final String URL = "url";		public static final String CLIENT_KEY = "clientKey";		public static final String COUNTRY = "country";		public static final String COUNTRY_NAME = "country_name";		public static final String CITY = "city";		//唯一的,独一无二的	public static final String UNIQUE = "unique";		//城镇整数	public static final String COUNTRY_TOTAL = "countryTotal";		//城市整数	public static final String CITY_TOTAL = "cityTotal";		//总共计数	public static final String TOTAL_COUNT = "totalCount";		//总共独一无二的	public static final String TOTAL_UNIQUE = "totalUnique";}
package storm.cookbook;import org.json.simple.JSONObject;import org.json.simple.JSONValue;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.Serializable;import java.net.MalformedURLException;import java.net.URL;import java.net.URLConnection;public class HttpIPResolver implements IPResolver, Serializable {	static String url = "http://api.hostip.info/get_json.php";	@Override	public JSONObject resolveIP(String ip) {		URL geoUrl = null;		BufferedReader in = null;		try {			geoUrl = new URL(url + "?ip=" + ip);			URLConnection connection = geoUrl.openConnection();			in = new BufferedReader(new InputStreamReader(					connection.getInputStream()));			String inputLine;			JSONObject json = (JSONObject) JSONValue.parse(in);			in.close();			return json;		} catch (IOException e) {			e.printStackTrace();		} finally {			// 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作			if (in != null) {				try {					in.close();				} catch (IOException e) {				}			}		}		return null;	}}

package storm.cookbook;import org.json.simple.JSONObject;/** * Created with IntelliJ IDEA. * User: admin * Date: 2012/12/07 * Time: 5:29 PM * To change this template use File | Settings | File Templates. */public interface IPResolver {	public JSONObject resolveIP(String ip);}

至此,整个流程完毕。 对于统计以后,数据如何持久,亦或是数据数据写回redis的过程,请实践~

转载于:https://my.oschina.net/infiniteSpace/blog/284806

你可能感兴趣的文章
移动运维孔同学的CCIE PASS心得
查看>>
NameNode中的高可用方案
查看>>
大会介绍:
查看>>
mysql 删除日志文件命令详解
查看>>
Linux如何通过PAM限制用户登录失败次数
查看>>
bufio读写
查看>>
UPESB天气查询用例(二)
查看>>
zabbix更换页面LOGO
查看>>
Relay log read failure错误解决
查看>>
CentOS系统局域网YUM本地源配置
查看>>
ansible-role角色:通过一个批量部署nginx范例学习role
查看>>
向SqlParameter内动态添加参数
查看>>
在windows下与linux虚拟机进行文件共享
查看>>
php 图形用户界面GUI 开发
查看>>
正则表达式详解
查看>>
linux文件与目录之权限对比
查看>>
LeetCode问题5
查看>>
AIX系列------ISO挂载
查看>>
如何打开被管理员禁止的注册表编辑器
查看>>
java根据经纬度计算距离
查看>>