`
zhang_xzhi_xjtu
  • 浏览: 524522 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hadoop_hadoop的一次读取

 
阅读更多
一次hadoop的read
getFileSystem
代码
	public static FileSystem getFileSystem() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(
				URI.create("hdfs://192.168.81.130:9001"), conf);
		return fs;
	}




Configuration
Configuration基本就是一个空对象。添加了2个配置文件到资源列表。
    addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");

第一次通过Configuration获取param时才触发资源加载解析。



文件系统的cache
static class Cache {
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      FileSystem fs = null;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
      
      fs = createFileSystem(uri, conf);
      synchronized (this) {  // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty() && !clientFinalizer.isAlive()) {
          Runtime.getRuntime().addShutdownHook(clientFinalizer);
        }
        fs.key = key;
        map.put(key, fs);
        return fs;
      }
}

由URI uri, Configuration conf作为key,对FileSystem做了缓存。



初始化文件系统
  private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    LOG.debug("Creating filesystem for " + uri);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }

由config中的fs.hdfs.impl得到文件系统的实现类。这里就是org.apache.hadoop.hdfs.DistributedFileSystem。初始化DistributedFileSystem,这样DistributedFileSystem就可以和namenode通信了。



Read file content
代码
	/**
	 * linux cat file.
	 * */
	public static void readFile(String path) throws Exception {

		System.out.println("--------------------------------------");
		System.out.println("reading file on path = " + path);

		FileSystem fs = Common.getFileSystem();

		InputStream in = null;
		try {
			in = fs.open(new Path(path));
			IOUtils.copyBytes(in, System.out, 4096, false);
		} finally {
			IOUtils.closeStream(in);
		}
		System.out.println("--------------------------------------");
	}



解析path
文件的path为
hdfs://192.168.81.130:9001/user/allen/input4wordcount/test_text_01.txt
解析后为
Scheme hdfs
Authority 192.168.81.130:9001
Path /user/allen/input4wordcount/test_text_01.txt



打开FSDataInputStream
联系namenode取到block信息,注意这里是一个范围查询。查询结果缓存起来。
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
prefetchSize = 671088640



在cache中查找block
public int findBlock(long offset) {
    // create fake block of size 1 as a key
    LocatedBlock key = new LocatedBlock();
    key.setStartOffset(offset);
    key.getBlock().setNumBytes(1);
    Comparator<LocatedBlock> comp = 
      new Comparator<LocatedBlock>() {
        // Returns 0 iff a is inside b or b is inside a
        public int compare(LocatedBlock a, LocatedBlock b) {
          long aBeg = a.getStartOffset();
          long bBeg = b.getStartOffset();
          long aEnd = aBeg + a.getBlockSize();
          long bEnd = bBeg + b.getBlockSize();
          if(aBeg <= bBeg && bEnd <= aEnd 
              || bBeg <= aBeg && aEnd <= bEnd)
            return 0; // one of the blocks is inside the other
          if(aBeg < bBeg)
            return -1; // a's left bound is to the left of the b's
          return 1;
        }
      };
    return Collections.binarySearch(blocks, key, comp);
  }

注意这里的Comparator有一个特殊处理。为了fake key可以和待查找的LocatedBlock相等。



如果cache不命中则重新查询namenode
      int targetBlockIdx = locatedBlocks.findBlock(offset);
      if (targetBlockIdx < 0) { // block is not cached
        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
        // fetch more blocks
        LocatedBlocks newBlocks;
        newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
        assert (newBlocks != null) : "Could not find target position " + offset;
        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
      }

更新原有cache
public void insertRange(int blockIdx, List<LocatedBlock> newBlocks) {
    int oldIdx = blockIdx;
    int insStart = 0, insEnd = 0;
    for(int newIdx = 0; newIdx < newBlocks.size() && oldIdx < blocks.size(); 
                                                        newIdx++) {
      long newOff = newBlocks.get(newIdx).getStartOffset();
      long oldOff = blocks.get(oldIdx).getStartOffset();
      if(newOff < oldOff) {
        insEnd++;
      } else if(newOff == oldOff) {
        // replace old cached block by the new one
        blocks.set(oldIdx, newBlocks.get(newIdx));
        if(insStart < insEnd) { // insert new blocks
          blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
          oldIdx += insEnd - insStart;
        }
        insStart = insEnd = newIdx+1;
        oldIdx++;
      } else {  // newOff > oldOff
        assert false : "List of LocatedBlock must be sorted by startOffset";
      }
    }
    insEnd = newBlocks.size();
    if(insStart < insEnd) { // insert new blocks
      blocks.addAll(oldIdx, newBlocks.subList(insStart, insEnd));
    }
  }



选择datanode
  /**
   * Pick the best node from which to stream the data.
   * Entries in <i>nodes</i> are already in the priority order
   */
  private DatanodeInfo bestNode(DatanodeInfo nodes[], 
                                AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
                                throws IOException {
    if (nodes != null) { 
      for (int i = 0; i < nodes.length; i++) {
        if (!deadNodes.containsKey(nodes[i])) {
          return nodes[i];
        }
      }
    }
    throw new IOException("No live nodes contain current block");
  }

当和datanode建立连接时,如果出错。则3秒后(程序hard code)联系namenode重新获取datanode的信息。当重试超过一定次数时,则报错。



建立连接,读取内容
注意这里有一个简单的文件协议。

1
1
分享到:
评论
1 楼 inuyasha027 2013-05-01  
这是一篇很好的文章,学习了,谢谢。

相关推荐

    Starred_Paper_Hadoop_Spark.docx

    实验数据结果表明,由于Spark平台主要基于分布式的内存计算,而Hadoop中的Mapreduce框架在每个map或reduce阶段存在回写或读取硬盘操作,所以Spark的性能优势远远在于Hadoop之上,但前者以使用大量内存进行数据存贮或...

    winutils.exe and hadoop.dll test_ok_on win7_64 python3 spark2.2.0+hadoop2.8.1

    winutils.exe and hadoop.dll 在 win7_64 环境亲测通过,解压的winutils.exe放在...\hadoop-2.8.1\bin\下,hadoop.dll放在...\Windows\system32\下, 我安装的是Java 1.8.0_151,python3.6.3,spark2.2.0+hadoop...

    ducat_hadoop_demos:Hadoop演示

    ducat_hadoop_demos Hadoop演示 A. MsWordCount此项目将允许用户将MS word .doc文件加载到HDFS中,然后使用该文件来计算单词计数示例。 所面对的挑战是- Word文档需要使用来自Apache POI库的单独的记录读取器

    02_Hadoop 分布式文件系统(HDFS).docx

    ■ HDFS 提供什么功能 ■ HDFS 如何读取和写入文件 ■ NameNode 如何使用内存 ■ Hadoop 如何提供文件安全性 ■ 如何使用NameNode Web UI ■ 如何使用Hadoop 文件Shell

    hadoop.dll和winutils.exe

    hadoop本身不支持windows平台,本文件用于在idea中用java代码连接远程hadoop时读取本地文件,以下是用法 1. 将hadoop文件从linux下载到windows硬盘中 2. 添加hadoop文件夹到环境变量:HADOOP_HOME ~\hadoop-2.7.6\sbin;...

    eclipse里运行hadoop程序出现的错误总结

    hadoop读取完毕,在往本地保存文件时出现的错误 在win10中配置hadoop开发环境时没有在c:\windows\system32目录中拷贝hadoop.dll文件 2、 错误: org.apache.hadoop.io.nativeio.NativeIO$Win dows....

    Hadoop环境搭建、配置及通过执行计算来验证的示例

    Hadoop适合于一次计算,多次读取的场景,如搜索引擎,只支持随机读取不支持随机写入,如Hadoop和Lucene的集成就不能够直接集成,因为Lucene支持随机写入。 本文将从使用的角度上谈了如何搭建Hadoop、如何配置...

    hadoop/bin目录文件,含hadoop.dll + winutils.exe

    用于win7开发、调试下读取HDFS文件(包括hive读取),hadoop2.6.3

    Hadoop权威指南 第二版(中文版)

     本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。全书共16章,3个附录,涉及的主题包括:Haddoop简介;MapReduce简介;Hadoop分布式文件系统;Hadoop...

    Hadoop从入门到上手企业开发

    039 HDFS文件系统读写流程及HDFS API两种方式读取文件 040 详解HDFS API之FileSystem方式基本操作二 041 讲解分析Configuration和FileSystem类源代码 042 引出HDFS实际应用场景之合并文件和使用getmerge命令并查看...

    hadoop的经典讲义

    必须通过hadoop fs 命令来读取。支持分布式。 MapReduce : 大型分布式数据处理模型,是Google MapReduce的开源实现。 •合并/计算模型。 其他相关组成: •Hbase:结构化分部式数据库。BigTable的开源实现。 •...

    java通过api方式操作Hadoop

    一.文件操作 1.上传本地文件到hadood 2.在hadoop中新建文件,并写入 3.删除hadoop上的文件 4.读取文件 5.文件修改时间 二.目录操作 1.在hadoop上创建目录 2.删除目录 3.读取某个目录下的所有文件 三.hdfs信息 1.查找...

    Hadoop权威指南(中文版)2015上传.rar

    从Hadoop URL中读取数据 通过FileSystem API读取数据 写入数据 目录 查询文件系统 删除数据 数据流 文件读取剖析 文件写入剖析 一致模型 通过 distcp并行拷贝 保持 HDFS 集群的均衡 Hadoop的归档文件 使用Hadoop归档...

    spark/hadoop读取s3所需要的外部依赖包

    aws-java-sdk-1.7.4.jar hadoop-aws-2.7.2.jar 引入代码即可在读写中国区亚马逊s3

    java 从hadoop hdfs读取文件 进行groupby并显示为条形图

    从hadoop hdfs中读取数据,进行groupby 显示统计结果count、avg、max,用文字和柱状图两种图形界面表示

    java大数据案例_8Redis、Thrift、Hadoop2

    1 Redis安装练习 问题 依次完成以下2小题的任务: 1) 安装Redis 2) 配置复制

    基于大数据(Hadoop+Java+MySQL)的数码商城购物推荐系统设计与实现.zip

    合理的为用户做出推荐,推荐的结果可靠程度很高,这就是我的优势所在,因为它和一般的推荐系统的推荐算法不太一样,我的推荐算法是利用Hadoop技术写的,我们可以利用Hadoop集群的高吞吐量,一次读取多次写入等特点...

    Hadoop源码分析(client部分)

    自己写的PPT,详解Hadoop源码及其相关流程

    Hadoop源代码分析(IFile)

    Hadoop中的IFile接口的源代码分析

Global site tag (gtag.js) - Google Analytics