My blog

Clever tagline. Powered by Obtvse

在线学习资源

这是我经常访问的一些在线学习资源:

开源软件培训 http://marakana.com/

coursera-开放大学公开课 https://www.coursera.org/

edx-开放大学公开课 https://www.edx.org/

软件学习网站 我爱学习 http://www.ilovexuexi.com/

交互式编程学习 http://www.codecademy.com/

在线编程学习 http://www.codeschool.com/

在线学习Web开发 http://teamtreehouse.com/

微软虚拟学院 https://www.microsoftvirtualacademy.com

开源在线学习管理平台 claroline http://www.claroline.net/?lang=en

Scala函数编程读书笔记(一)

前几天在Manning网买了本《Functional Programming in Scala》电子书,正好利用春节长假好好学习一下。

这不是一本关于Scala编程的书,而是介绍函数编程(Functional Programming,简称FP)概念和原理的书,虽然是用Scala作为代码示例,但是其原理可以用于任何一种函数编程语言,例如F#、Erlang、clojure等。而一些其他语言如Python、Ruby、Javascript等,对函数编程的支持都很强,一些函数编程原理也可以同样适用于这些语言。


第一章 什么是函数编程

"函数编程"是一种"编程范式"(programming paradigm),也就是如何编写程序的方法论。它基于这样一个简单的限制条件:只用“纯函数”编写程序,换句话说,就是只用没有“副作用”的函数编程。

通常情况下,以下一些操作都将会带来副作用:

  • 修改变量
  • 修改数据结构
  • 设置对象的属性
  • 捕获异常
  • 控制台输入或者输出
  • 读写一个文件
  • 往显示屏上输出

假设一个程序不能做以上任何事情,我们难以想象,这个程序还能有什么用处。如果不能修改变量,那么如何写循环语句呢?函数编程就是告诉我们如何写出不带副作用的程序。事实证明,增加这种限制是有很多好处的,我们编写的程序更加模块化,纯函数的程序更容易测试,重用,并行化。为了获得这些好处,我们必须转变编程方式。

一个函数有一个类型A的输入和B类型的输出(在Scala中表示为A => B),对于每个指定的类型A的值a,都有唯一一个对应的B类型的值b。

例如,intToString就是一个Int => String的函数,对于每个指定的Int类型的数,都输出一个String类型的值。除此之外,它不做任何事情。换句话说,就是没有可以察觉到的其他任何计算返回值以外的操作,我们称这个函数为“纯函数”。其他的一些纯函数比如“+”,对于任何给定的两个整数,每次执行都会返回相同的一个整数。Java或者scala中String类型的length也是如此,其他还有很多这样的纯函数。

纯函数可以用表达式取代另外一个表达式中的值,例如,x=3+5,那么y=x+1 和 y=(3+5)+1 是完全等价的。非纯函数就不是这样了,下面用两个例子说明它们之间的区别。

scala> val x = "Hello, World"
x: java.lang.String = Hello, World
scala> val r1 = x.reverse
r1: String = dlroW ,olleH
scala> val r2 = x.reverse
r2: String = dlroW ,olleH

scala> val r1 = "Hello, World".reverse
r1: String = dlroW ,olleH
val r2 = "Hello, World".reverse
r2: String = dlroW ,olleH

在这个例子中,reverse是一个纯函数。reverse不会有副作用。

scala> val x = new StringBuilder("Hello")
x: java.lang.StringBuilder = Hello
scala> val y = x.append(", World")
y: java.lang.StringBuilder = Hello, World
scala> val r1 = y.toString
r1: java.lang.String = Hello, World
scala> val r2 = y.toString
r2: java.lang.String = Hello, World

scala> val x = new StringBuilder("Hello")
x: java.lang.StringBuilder = Hello
scala> val r1 = x.append(", World").toString
r1: java.lang.String = Hello, World
scala> val r2 = x.append(", World").toString
r2: java.lang.String = Hello, World, World

而在这里例子中,append除了返回以外,还改变了x的值,带来了副作用,因此它不是一个纯函数。

增加了这样限制的函数编程,给我们带来了很大程度的模块化。模块化程序是可以理解的和可以重复使用的组件,它独立于整体。这样,整体的含义仅仅依赖于它的组件和它们之间组合的规则的含义。模块可以看成是一个黑盒子,使得逻辑计算具有可重用性。后面的章节都是讨论如何进行函数编程。

几种基于java/scala的REST框架性能简单测试

测试服务器:DELL R420 CPU Intel(R) Xeon(R) CPU E5-2420 内存 32G

操作系统:centos6.3

JVM: 1.6

测试客户端: 同上

网络环境:100M交换机直连

基准测试: java/Jetty

依赖库:jetty-all-7.6.8, servlet-api-2.5

ab -c 100 -n 1000 Requests per second: 18041.10 [#/sec]

ab -c 200 -n 1000 Requests per second: 17087.59 [#/sec]

ab -c 500 -n 1000 Requests per second: 15134.78 [#/sec]

scala/scalatra

依赖库:Scala 2.9.2, jetty8.1.7

ab -c 100 -n 1000 Requests per second: 1004.86 [#/sec]

ab -c 200 -n 1000 Requests per second: 1093.27 [#/sec]

ab -c 500 -n 1000 Requests per second: 1134.84 [#/sec]

scala/play-mini

依赖库:Scala 2.10.0, akka2.10

ab -c 100 -n 1000 Requests per second: 8328.75 [#/sec]

ab -c 200 -n 1000 Requests per second: 8392.29 [#/sec]

ab -c 500 -n 1000 Requests per second: 7466.20 [#/sec]

scala/spray

依赖库:Scala 2.10.0, spray1.1

ab -c 100 -n 1000 Requests per second: 4564.86 [#/sec]

ab -c 200 -n 1000 Requests per second: 4665.01 [#/sec]

ab -c 500 -n 1000 Requests per second: 4330.07 [#/sec]

java/spring

依赖库:jetty-8.1.1, spring3

ab -c 100 -n 1000 Requests per second: 13579.39 [#/sec]

ab -c 200 -n 1000 Requests per second: 16049.56 [#/sec]

ab -c 500 -n 1000 Requests per second: 14456.09 [#/sec]

java/resteasy

依赖库:jetty-8.1.8,guice

ab -c 100 -n 1000 Requests per second: 13169.16 [#/sec]

ab -c 200 -n 1000 Requests per second: 17738.36 [#/sec]

ab -c 500 -n 1000 Requests per second: 15211.90 [#/sec]

总结,经过简单的Hello,world!测试,发现resteasy和spring的性能最高,接近Jetty直接返回的水平。其次是play-mini ,能达到Jetty一半的水平。scalatra虽然简单,但是性能最低,比较出乎意料,不推荐使用。

Kafka-分布式消息系统

Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。

传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。

  1. http://incubator.apache.org/kafka/downloads.html最新版本源代码。

  2. 解压后,进入目录,执行

    ./sbt update
    ./sbt package
    
  3. 启动服务

    ./bin/zookeeper-server-start.sh config/zookeeper.properties
    ./bin/kafka-server-start.sh config/server.properties
    
  4. 编写producer,连续发送10万条消息

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.javaapi.producer.ProducerData;
    import kafka.producer.ProducerConfig;
    public class producer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("zk.connect", "127.0.0.1:2181");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
            ProducerData<String, String> data ;
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
             for (int i=0;i<=100000;i++)
            {
                data = new ProducerData<String, String>("test", df.format(new Date()));
                producer.send(data);
            }
        }
    }
    
  5. 编写consumer,接收消息

    import java.nio.ByteBuffer;
    import kafka.api.FetchRequest;
    import kafka.javaapi.consumer.SimpleConsumer;
    import kafka.javaapi.message.ByteBufferMessageSet;
    import kafka.message.Message;
    import kafka.message.MessageAndOffset;
    
    public class consumer{
          public static String getMessage(Message message)
          {
            ByteBuffer buffer = message.payload();
            byte [] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            return new String(bytes);
          }
    
        public static void main(String[] args) {
            SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
            long offset = 0L;
            while (true) {
              FetchRequest fetchRequest = new FetchRequest("test", 0, 0L, 100000);
              ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
              for (MessageAndOffset messageAndOffset : messages) {
                System.out.println(getMessage(messageAndOffset.message()));
                offset = offset + messageAndOffset.offset(); 
              }
            }
        }
    }
    

经过本机(i5, 8G内存)编译运行测试,接收10万条消息,耗时不超过3秒。

参考资料: http://incubator.apache.org/kafka/quickstart.html

Hadoop学习(六) Scalding编程

Scalding是Twitter开源的一个MapReduce框架。与(Apache)Pig 类似的是,Scalding在MapReduce之上提供了一个抽象层,可以用简洁的句法编写大数据任务。Scalding作为为一个Scala库,建立在Cascading Java库的上层,对Cascading进行了简单封装,旨在让编写Hadoop MapReduce 的工作变得更容易。下面我们来看看用Scalding编写的刚才的WordCount,同样是用scala编程,原来几十行的代码,现在只剩下了短短几行。

import com.twitter.scalding._
class WordCountJob(args : Args) extends Job(args) {
  TextLine( args("input") ).read.
   flatMap('line -> 'word) {line : String => line.split("\\s+")}.
   groupBy('word) { _.size }.
   write( Tsv( args("output") ) )
}

从github上下载源代码:

     git clone https://github.com/twitter/scalding.git

进入scalding路径后,执行:

    sbt update
    sbt sbt assembly

将WordCountJob.scala文件复制到项目路径。执行:

    scripts/scald.rb --local WordCountJob.scala --input README.md --output ./someOutputFile.tsv

就可以看到运行结果。

启动hadoop, 将--local参数改成--hdfs, 输入文件改成hdfs://name:9000/下的文件,则系统自动编译成jar,并且作为一个hadoop的job放到hadoop集群上运行,最后得到的结果保存在hdfs中。

参考资料:https://github.com/twitter/scalding/wiki/Getting-Started

Hadoop学习(五) 使用scala开发Map/Reduce程序

在eclipse中安装scala插件,然后新建一个scala项目,将hadoop安装目录下的jar文件添加到builder路径。 新建WordCount.scala文件:

    import java.io.IOException
    import java.util._
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf._
    import org.apache.hadoop.io._
    import org.apache.hadoop.mapred._
    import org.apache.hadoop.util._

    class Map extends MapReduceBase
    with Mapper[LongWritable, Text, Text, IntWritable] {
        private val one = new IntWritable(1);
        private val word = new Text();

        def map(key: LongWritable, value: Text,
            output: OutputCollector[Text, IntWritable],
            reporter: Reporter
        ) { 

            val line = value.toString
            val tokenizer = new StringTokenizer(line)
            while(tokenizer.hasMoreTokens) {
                word.set(tokenizer.nextToken)
                output.collect(word, one)
            }
        }
    }

    class Reduce extends MapReduceBase
    with Reducer[Text, IntWritable, Text, IntWritable] {
        def reduce(key: Text, values: Iterator[IntWritable],
            output: OutputCollector[Text, IntWritable],
            reporter: Reporter
        ) {
            output.collect(key, new IntWritable(count(0, values)))

            def count(sum: Int, vs: Iterator[IntWritable]): Int =
                if(vs.hasNext)
                    count(sum + vs.next.get, vs)
                else
                    sum
        }
    }

    object WordCount {
        def main(args: Array[String]) {
                val conf = new JobConf(this.getClass)
            conf.setJobName("WordCount")
            conf.setOutputKeyClass(classOf[Text])
            conf.setOutputValueClass(classOf[IntWritable])
            conf.setMapperClass(classOf[Map])
            conf.setCombinerClass(classOf[Reduce])
            conf.setReducerClass(classOf[Reduce]) 
            conf.setInputFormat(classOf[TextInputFormat])
            conf.setOutputFormat(classOf[TextOutputFormat[Text, IntWritable]])
            FileInputFormat.setInputPaths(conf, new Path("hdfs://localhost:9000/a01.dat"));
            FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/output"));
            JobClient.runJob(conf)
        }
    }

切换到Map/Reduce视图,选择Run As - Run on Hadoop,就可以得到结果。

Hadoop学习(四) 集群环境安装

安装准备:5台KVM虚拟机,4G内存,100G硬盘,最小化安装操作系统 Centos6.3。

  1. 每台机器升级操作系统到最新:

      yum update
    
  2. 每台机器安装hadoop所需JDK软件:

     yum install  java-1.6.0-openjdk
    
  3. 从apache下载最新稳定版hadoop 1.0.3的安装包,复制到每台机器:

     wget http://www.fayea.com/apache-mirror/hadoop/common/hadoop-1.0.3/hadoop-1.0.3-1.x86_64.rpm
    
  4. 每台机器安装hadoop:

     rpm -i hadoop-1.0.3-1.x86_64.rpm
    
  5. 每台机器配置host文件,添加每台机器的机器名和对应的ip地址:

    vi /etc/hosts
    
  6. 配置ssh免密码登陆,每台服务器上运行:

    $ssh-keygen -t rsa -P ""
    $cat~/.ssh/id_dsa.pub>>~/.ssh/authorized_keys
    

    把各台服务器的~/.ssh/authorizedkeys文件内容合并到一个总的authorizedkeys文件;把那个总的authorizedkeys文件scp到每台服务器,替换原有的authorizedkeys文件;互相ssh对方机器,确认ssh登陆不需要密码。

  7. 修改hadoop-env.sh配置文件,设置JAVA_HOME:

    export JAVA_HOME=/usr/lib/jvm/jre

  8. 修改3个hadoop配置文件,将文件复制到所有服务器。

     vi core-site.xml
    <configuration>
    <property>
    <name>fs.default.name</name>
    <value>hdfs://namenode:9000</value>
    </property>
    </configuration>
    
     vi mapred-site.xml
    <configuration>
    <property>
    <name>mapred.job.tracker</name>
    <value>namenode:9001</value>
    </property>
    </configuration>
    
    vi hdfs-site.xml
    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>3</value>
    </property>
    </configuration>
    
  9. 修改masters和slave文件: 在masters中添加第二名称节点的主机名,在slave中添加3个数据节点的主机名。

  10. 格式化 hdfs:

     hadoop  namenode -format  
    
  11. 启动 hadoop:

      start-all.sh
    
  12. 检查hadoop状态:

     hadoop dfsadmin -report
    
  13. 停止hadoop:

    stop-all.sh
    

总结,如果过程中出现错误,通过检查日志查找原因,很可能是连接和权限的问题,需要关闭防火墙和SELinux。

参考资料: http://hadoop.apache.org/common/docs/r1.0.3/cluster_setup.html

Hadoop学习(三) Map/Reduce编程

WordCount是一个简单的应用,它读入文本文件,然后统计出字符出现的频率。输入是文本文件,输出也是文本文件,它的每一行包含了一个字符和它出现的频率,用一个制表符隔开。这是《Hadoop Map/Reduce教程》中的一个入门的Map/Reduce编程例子,可以说是Map/Reduce版的Hello,World.

先随便找一个英文的文本文件,重新命名为a01.dat,通过Upload files to DFS,将a01.dat文件上传到DFS中。

在Eclipse新建项目向导中,新建一个Map/Reduce项目。Map/Reduce项目,包含三个主要文件,一个是Map文件,一个是Reduce文件,还有一个是主文件。源代码如下:

Map.java

    import java.io.IOException;
    import java.util.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Mapper;

    public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
        }
     } 

Reduce.java

    import java.io.IOException;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;

    public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
          throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
     }

WordCount.java

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class WordCount {

      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "wordcount");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/a01.dat"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
        job.waitForCompletion(true);
        }
    }

选择Run As - Run on Hadoop

运行结果存放在output路径下,可以通过http://localhost:50070/查看。

该程序将文本文件的输入,通过Map函数,转换成一组 key,value 有序对。然后根据key,合并成 key, value1,value2....,再通过Reducer函数,做累加操作,计算出每个单词的出现次数,生成新的key,sum有序对后输出。

手头上有个邮件列表,包含了几万个邮件地址,于是修改了一下map函数,统计各个邮箱的使用情况。修改后的map为:

        public void map(LongWritable key, Text value, Context context) 
             throws IOException, InterruptedException {
             String[] sarray=value.toString().split("@");
             word.set(sarray[1]);
            context.write(word, one);
         }

运行后得到以下结果:

      126.com 17230
      139.com 573
      163.com 35928
      21cn.com  1372
      citiz.net 223
      eyou.com  385
      foxmail.com 143
      gmail.com 2228
      hotmail.com 11021
      live.cn 437
      msn.com 562
      qq.com  22185
      sina.com  9671
      sina.com.cn 540
      sogou.com 222
      sohu.com  4106
      tom.com 2676
      vip.163.com 129
      vip.qq.com  589
      vip.sina.com  355
      vip.sohu.com  285
      yahoo.cn  14607
      yahoo.com 315
      yahoo.com.cn  10770
      yahoo.com.hk  252
      yeah.net  828

参考文章: http://hadoop.apache.org/common/docs/r0.19.2/cn/mapred_tutorial.html

Hadoop学习(二) 配置Hadoop的Eclipse的编程环境

Eclipse版本,Juno (4.2) 。先去网上下载Eclipse的Plugin插件,org.apache.hadoop.eclipse.plugins.1.0.3.jar 放到eclipse/dropins下,重启eclipse会自动找到插件。

在Eclipse的偏好设定-Hadoop下设定本地的Hadoop安装路径。

在Windows->Open Perspective中,选择Other,在弹出框中选择Map/Reduce,点击OK。进入Map/Reduce视图。

点击右键,新建一个Location,在弹出框内设定名称和端口号9000。

点击Finish,完成后,在左边的Project Explore视图中,即可看到已经建立的目录结构。

本地创建一个文件,命名为a01.dat,编辑a01.dat,输入:Hello,hadoop! 保存退出。

在Project Explore视图中,点击右键,选择Upload files to DFS,将a01.dat文件上传。

在新建项目向导中,新建一个Map/Reduce的项目。

在src路径下,新建一个普通的Java类,源代码如下:

  import java.io.FileNotFoundException;  
  import java.io.IOException;  
  import java.net.URI;
  import java.net.URISyntaxException;  
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FSDataInputStream;  
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;  
  import org.apache.hadoop.io.IOUtils;  
  import org.apache.hadoop.security.AccessControlException;  

  public class test {  
     public static void main(String[] args) throws AccessControlException,  
        FileNotFoundException, IOException, URISyntaxException {  

        String dst = "hdfs://localhost:9000/a01.dat";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(dst), conf);
        FSDataInputStream hdfsInStream = fs.open(new Path(dst));
    IOUtils.copyBytes(hdfsInStream, System.out, 4090, false);  
   }  
 }  

选择Run As - Run on Hadoop 出现结果:Hello,hadoop!

参考文章: http://zhangxiongfei.iteye.com/blog/1553082

Hadoop学习(一) MAC OS下安装Hadoop开发环境

在MAC OS下安装和配置Hadoop开发环境相当简单。

先去下载最新稳定版本的Hadoop的文件包,下载地址为: http://mirror.bjtu.edu.cn/apache/hadoop/common/hadoop-1.0.3/hadoop-1.0.3-bin.tar.gz

下载后解压复制到/Users 路径下。

在/Users/hadoop-1.0.3/conf 路径下修改以下四个文件:

在core-site.xml 的configuration段中增加

     <property>  
            <name>fs.default.name</name>  
            <value>localhost:9000</value>  
     </property>

在hdfs-site.xml的configuration段中增加

    <property>  
         <name>dfs.replication</name>  
         <value>1</value>  
    </property>

表示使用hdfs分布是文件系统,复制份数为1,在单机上运行。

在mapred-site.xml 的configuration段中增加

    <property> 
        <name>mapred.job.tracker</name>  
        <value>localhost:9001</value>  
    </property>

表示在本机执行jobtracker进程。

在hadoop-env.sh 中增加以下环境变量

    export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home 
    export HADOOP_INSTALL=/Users/hadoop-1.0.3 

   export PATH=$PATH:$HADOOP_INSTALL/bin 

在MAC OS的系统偏好设置-共享中,允许远程登陆,打开ssh访问。

打开终端,进入/Users/hadoop-1.0.3 路径,执行:

bin/hadoop namenode -format

初始化hdfs文件系统。

最后启动Hadoop。

bin/start-all.sh 

执行jps命令可以查看运行中的Hadoop进程。

通过页面http://localhost:50070 可以查看Hadoop运行状态。