- 浏览: 524522 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
飞天奔月:
public List<String> gener ...
实践中的重构30_不做油漆匠 -
在世界的中心呼喚愛:
在世界的中心呼喚愛 写道public class A {
...
深入理解ReferenceQueue GC finalize Reference -
在世界的中心呼喚愛:
在世界的中心呼喚愛 写道在世界的中心呼喚愛 写道在classB ...
深入理解ReferenceQueue GC finalize Reference -
在世界的中心呼喚愛:
在世界的中心呼喚愛 写道在classB的finalize上打断 ...
深入理解ReferenceQueue GC finalize Reference -
在世界的中心呼喚愛:
iteye比较少上,如果可以的话,可以发e-mail交流:ch ...
深入理解ReferenceQueue GC finalize Reference
一次hadoop的read
getFileSystem
代码
Configuration
Configuration基本就是一个空对象。添加了2个配置文件到资源列表。
第一次通过Configuration获取param时才触发资源加载解析。
文件系统的cache
由URI uri, Configuration conf作为key,对FileSystem做了缓存。
初始化文件系统
由config中的fs.hdfs.impl得到文件系统的实现类。这里就是org.apache.hadoop.hdfs.DistributedFileSystem。初始化DistributedFileSystem,这样DistributedFileSystem就可以和namenode通信了。
Read file content
代码
解析path
文件的path为
hdfs://192.168.81.130:9001/user/allen/input4wordcount/test_text_01.txt
解析后为
Scheme hdfs
Authority 192.168.81.130:9001
Path /user/allen/input4wordcount/test_text_01.txt
打开FSDataInputStream
联系namenode取到block信息,注意这里是一个范围查询。查询结果缓存起来。
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
prefetchSize = 671088640
在cache中查找block
注意这里的Comparator有一个特殊处理。为了fake key可以和待查找的LocatedBlock相等。
如果cache不命中则重新查询namenode
更新原有cache
选择datanode
当和datanode建立连接时,如果出错。则3秒后(程序hard code)联系namenode重新获取datanode的信息。当重试超过一定次数时,则报错。
建立连接,读取内容
注意这里有一个简单的文件协议。
getFileSystem
代码
public static FileSystem getFileSystem() throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get( URI.create("hdfs://192.168.81.130:9001"), conf); return fs; }
Configuration
Configuration基本就是一个空对象。添加了2个配置文件到资源列表。
addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml");
第一次通过Configuration获取param时才触发资源加载解析。
文件系统的cache
static class Cache { private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>(); FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); FileSystem fs = null; synchronized (this) { fs = map.get(key); } if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !clientFinalizer.isAlive()) { Runtime.getRuntime().addShutdownHook(clientFinalizer); } fs.key = key; map.put(key, fs); return fs; } }
由URI uri, Configuration conf作为key,对FileSystem做了缓存。
初始化文件系统
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); LOG.debug("Creating filesystem for " + uri); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }
由config中的fs.hdfs.impl得到文件系统的实现类。这里就是org.apache.hadoop.hdfs.DistributedFileSystem。初始化DistributedFileSystem,这样DistributedFileSystem就可以和namenode通信了。
Read file content
代码
/** * linux cat file. * */ public static void readFile(String path) throws Exception { System.out.println("--------------------------------------"); System.out.println("reading file on path = " + path); FileSystem fs = Common.getFileSystem(); InputStream in = null; try { in = fs.open(new Path(path)); IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } System.out.println("--------------------------------------"); }
解析path
文件的path为
hdfs://192.168.81.130:9001/user/allen/input4wordcount/test_text_01.txt
解析后为
Scheme hdfs
Authority 192.168.81.130:9001
Path /user/allen/input4wordcount/test_text_01.txt
打开FSDataInputStream
联系namenode取到block信息,注意这里是一个范围查询。查询结果缓存起来。
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
prefetchSize = 671088640
在cache中查找block
public int findBlock(long offset) { // create fake block of size 1 as a key LocatedBlock key = new LocatedBlock(); key.setStartOffset(offset); key.getBlock().setNumBytes(1); Comparator<LocatedBlock> comp = new Comparator<LocatedBlock>() { // Returns 0 iff a is inside b or b is inside a public int compare(LocatedBlock a, LocatedBlock b) { long aBeg = a.getStartOffset(); long bBeg = b.getStartOffset(); long aEnd = aBeg + a.getBlockSize(); long bEnd = bBeg + b.getBlockSize(); if(aBeg <= bBeg && bEnd <= aEnd || bBeg <= aBeg && aEnd <= bEnd) return 0; // one of the blocks is inside the other if(aBeg < bBeg) return -1; // a's left bound is to the left of the b's return 1; } }; return Collections.binarySearch(blocks, key, comp); }
注意这里的Comparator有一个特殊处理。为了fake key可以和待查找的LocatedBlock相等。
如果cache不命中则重新查询namenode
int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); // fetch more blocks LocatedBlocks newBlocks; newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize); assert (newBlocks != null) : "Could not find target position " + offset; locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); }
更新原有cache
public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) { int oldIdx = blockIdx; int insStart = 0, insEnd = 0; for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size(); newIdx++) { long newOff = newBlocks.get(newIdx).getStartOffset(); long oldOff = blocks.get(oldIdx).getStartOffset(); if(newOff < oldOff) { insEnd++; } else if(newOff == oldOff) { // replace old cached block by the new one blocks.set(oldIdx, newBlocks.get(newIdx)); if(insStart < insEnd) { // insert new blocks blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd)); oldIdx += insEnd - insStart; } insStart = insEnd = newIdx+1; oldIdx++; } else { // newOff > oldOff assert false : "List of LocatedBlock must be sorted by startOffset"; } } insEnd = newBlocks.size(); if(insStart < insEnd) { // insert new blocks blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd)); } }
选择datanode
/** * Pick the best node from which to stream the data. * Entries in <i>nodes</i> are already in the priority order */ private DatanodeInfo bestNode(DatanodeInfo nodes[], AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes) throws IOException { if (nodes != null) { for (int i = 0; i < nodes.length; i++) { if (!deadNodes.containsKey(nodes[i])) { return nodes[i]; } } } throw new IOException("No live nodes contain current block"); }
当和datanode建立连接时,如果出错。则3秒后(程序hard code)联系namenode重新获取datanode的信息。当重试超过一定次数时,则报错。
建立连接,读取内容
注意这里有一个简单的文件协议。
发表评论
-
hbase分页功能的几种实现方案
2015-01-13 23:52 5370hbase分页功能的几种实现方案。 分页功能是线上系统的常用 ... -
simplehbase v0.98.1开始支持hbase0.98
2014-12-29 21:48 1019https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v1.0简介
2014-12-13 18:55 1364https://github.com/zhang-xzhi/s ... -
hbase put UML图
2014-12-11 23:40 1272create Htable put hbase rpc ... -
hbase开发问题-PooledHTable多次close导致问题
2014-12-11 23:27 1998PooledHTable多次close导致问题 Pooled ... -
hbase的CoprocessorProtocol及一个简单的通用扩展实现V2
2014-12-04 18:00 2075hbase中的CoprocessorProtocol机制. ... -
hbase 0.94.0 0.94.9 0.94.24 功能不兼容初步分析
2014-12-04 16:10 1087hbase 0.94.0 0.94.9 0.94.24 功能不 ... -
simplehbase对JOPO新增xml配置和无配置方式
2014-10-24 22:50 972simplehbase介绍文章如下: https://gith ... -
hbase轻量级中间件simplehbase v0.9简介
2014-07-14 13:57 613https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v0.8简介
2014-04-28 21:44 3726https://github.com/zhang-xzhi/s ... -
hbase开发问题-hbase版本号报错
2014-04-22 19:19 2734由于使用了自定义的classloader,导致报错。 p ... -
HBase Client使用注意点
2014-04-21 12:51 2525HBase Client使用注意点: 1 HTable线程 ... -
hbase开发问题-hbase-0.94.0的ServerCallable callTimeout处理有问题
2014-04-14 22:07 2160读hbase-0.94.0的ServerCallable时,发 ... -
Phoenix和simplehbase功能简单比较
2014-04-02 17:20 1600Phoenix和simplehbase功能简单比较 大数据应 ... -
hbase web console simplehbaseviewer
2014-03-12 19:11 1217https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v0.2简介
2013-12-19 23:51 1655https://github.com/zhang-xzhi/s ... -
hbase轻量级中间件simplehbase v0.1简介
2013-10-09 19:29 1529simplehbase尝试简化基于hbase的java应用开发 ... -
hbase的CoprocessorProtocol及一个简单的通用扩展实现V1
2013-08-18 14:15 5268hbase的CoprocessorProtocol及一个简单的 ... -
hbase的基本操作
2013-08-18 14:02 4457本文列举一些hbase的基本操作代码。 package ... -
hadoop_hadoop的map reduce
2011-11-09 21:21 1218这个根据功能模块分为 ...
相关推荐
实验数据结果表明,由于Spark平台主要基于分布式的内存计算,而Hadoop中的Mapreduce框架在每个map或reduce阶段存在回写或读取硬盘操作,所以Spark的性能优势远远在于Hadoop之上,但前者以使用大量内存进行数据存贮或...
winutils.exe and hadoop.dll 在 win7_64 环境亲测通过,解压的winutils.exe放在...\hadoop-2.8.1\bin\下,hadoop.dll放在...\Windows\system32\下, 我安装的是Java 1.8.0_151,python3.6.3,spark2.2.0+hadoop...
ducat_hadoop_demos Hadoop演示 A. MsWordCount此项目将允许用户将MS word .doc文件加载到HDFS中,然后使用该文件来计算单词计数示例。 所面对的挑战是- Word文档需要使用来自Apache POI库的单独的记录读取器
■ HDFS 提供什么功能 ■ HDFS 如何读取和写入文件 ■ NameNode 如何使用内存 ■ Hadoop 如何提供文件安全性 ■ 如何使用NameNode Web UI ■ 如何使用Hadoop 文件Shell
hadoop本身不支持windows平台,本文件用于在idea中用java代码连接远程hadoop时读取本地文件,以下是用法 1. 将hadoop文件从linux下载到windows硬盘中 2. 添加hadoop文件夹到环境变量:HADOOP_HOME ~\hadoop-2.7.6\sbin;...
hadoop读取完毕,在往本地保存文件时出现的错误 在win10中配置hadoop开发环境时没有在c:\windows\system32目录中拷贝hadoop.dll文件 2、 错误: org.apache.hadoop.io.nativeio.NativeIO$Win dows....
Hadoop适合于一次计算,多次读取的场景,如搜索引擎,只支持随机读取不支持随机写入,如Hadoop和Lucene的集成就不能够直接集成,因为Lucene支持随机写入。 本文将从使用的角度上谈了如何搭建Hadoop、如何配置...
用于win7开发、调试下读取HDFS文件(包括hive读取),hadoop2.6.3
本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。全书共16章,3个附录,涉及的主题包括:Haddoop简介;MapReduce简介;Hadoop分布式文件系统;Hadoop...
039 HDFS文件系统读写流程及HDFS API两种方式读取文件 040 详解HDFS API之FileSystem方式基本操作二 041 讲解分析Configuration和FileSystem类源代码 042 引出HDFS实际应用场景之合并文件和使用getmerge命令并查看...
必须通过hadoop fs 命令来读取。支持分布式。 MapReduce : 大型分布式数据处理模型,是Google MapReduce的开源实现。 •合并/计算模型。 其他相关组成: •Hbase:结构化分部式数据库。BigTable的开源实现。 •...
一.文件操作 1.上传本地文件到hadood 2.在hadoop中新建文件,并写入 3.删除hadoop上的文件 4.读取文件 5.文件修改时间 二.目录操作 1.在hadoop上创建目录 2.删除目录 3.读取某个目录下的所有文件 三.hdfs信息 1.查找...
从Hadoop URL中读取数据 通过FileSystem API读取数据 写入数据 目录 查询文件系统 删除数据 数据流 文件读取剖析 文件写入剖析 一致模型 通过 distcp并行拷贝 保持 HDFS 集群的均衡 Hadoop的归档文件 使用Hadoop归档...
aws-java-sdk-1.7.4.jar hadoop-aws-2.7.2.jar 引入代码即可在读写中国区亚马逊s3
从hadoop hdfs中读取数据,进行groupby 显示统计结果count、avg、max,用文字和柱状图两种图形界面表示
1 Redis安装练习 问题 依次完成以下2小题的任务: 1) 安装Redis 2) 配置复制
合理的为用户做出推荐,推荐的结果可靠程度很高,这就是我的优势所在,因为它和一般的推荐系统的推荐算法不太一样,我的推荐算法是利用Hadoop技术写的,我们可以利用Hadoop集群的高吞吐量,一次读取多次写入等特点...
自己写的PPT,详解Hadoop源码及其相关流程
Hadoop中的IFile接口的源代码分析