转载

Java 8与Apache Ignite

时间过得真快,转眼间十年过去了,而Java都已经二十岁了,《礼记》有云“二十弱冠、三十而立”,我们首先要感谢JamesGosling和他的团队把Java带到这个世界上来,而年方弱冠,这位翩翩少年郎在IT技术的各个领域都取得了举世瞩目的成绩,在二十年这么长的时间里,已经有无数的企业应用、互联网应用和移动应用都是基于Java构建的,甚至包括物联网上的很多应用也依然使用Java编写,而目前各具特色的编程语言也是层出不穷、百花齐放,也确实给Java造成了不小的压力。面对眼前瞬息万变的IT技术世界,我还是坚信Java到了“而立之年”,仍然可以老骥伏枥为我们带来更多的惊喜!

Java 8与Apache Ignite

杨旭钧

Java8新功能特性

从JDK发布的历程来看,JDK1.6添加了对脚本语言:Ruby、Groovy、JavaScript的支持,JDK7添加了对集合(Collections)的增强支持,而Java8在JDK7之上进一步大胆地创新,引入了Lambda框架、Streams、函数式接口模块等,这里我们重点讲一下Lambda框架。Java8的最大亮点就是引入了Lambda表达式,通过Lambda表达式提供闭包的方式,使集合(Collection)库更加容易遍历、过滤、从一个集合中抽取数据,特别是集合(Collection)操作充分利用多核编程的特性,能够大幅提升并发操作的性能。

什么是Lambda表达式?

Lambda表达式是一个匿名函数,即没有函数名的函数,通常我们用Lambda表达式表示闭包操作。当开发者在编写一个Lambda表达式时,被编译成函数式接口。

Runnable接口实现

代码1是使用Lambda表达式来写一个Runnable接口。

public class RunnableTest {  public static void main(String[] args) {  System.out.println("=== RunnableTest ===");  // Anonymous Runnable  Runnable r1 = new Runnable(){  @Override  public void run(){  S y s t e m . o u t . p r i n t l n ( " H e l l o w o r l d one!");  }  };  // Lambda Runnable  Runnable r2 = () -> System.out. println("Hello world two!");  // Run em!  r1.run();  r2.run(); } }

代码1

Comparator接口实现

代码2是使用Lambda表达式来写一个Comparator接口。Comparator类被用于筛选集合。在下面的例子中,Person对象通过surName筛选出来一个ArrayList列表。如下是包含Person类的域值。

public class Person {  private String givenName;  private String surName;  private int age;  private Gender gender;  private String eMail;  private String phone;  private String address; }

代码2

代码3通过使用匿名内部类和两个Lambda表达式来应用Comparator接口。

public class ComparatorTest {   public static void main(String[] args) {   List<Person> personList = Person. createShortList();   // Sort with Inner Class  Collections.sort(personList, new Comparator<Person>(){  public int compare(Person p1, Person p2){  return p1.getSurName().compareTo(p2. getSurName());  }  });   System.out.println("=== Sorted Asc SurName ===");  for(Person p:personList){  p.printName();  }   // Use Lambda instead   // Print Asc  System.out.println("=== Sorted Asc SurName ===");  Collections.sort(personList, (Person p1, Person p2) -> p1.getSurName().compareTo(p2. getSurName()));   for(Person p:personList){  p.printName();  }  // Print Desc  System.out.println("=== Sorted Desc SurName ===");  Collections.sort(personList, (p1, p 2 ) - > p 2 . g e t S u r N a m e ( ) . c o m p a r e T o ( p 1 . getSurName()));   for(Person p:personList){  p.printName();  }   } }

代码3

我们在后面两个Collections集合中引用了Lambda表达式,Lambda表达式声明了参数类型传递给了表达式。然而,我们从第二个Lambda表达式可以看到,这个参数传递是可选的,Lambda支持“targettyping”,可以智能地从上下文中引用对象类型。因为我们正在分配一个结果给Comparator,而编译器能够将这两个参数作为Person类型来引用。

Listner接口实现

代码4是使用Lambda表达式来写一个Listener接口。

public class ListenerTest {  public static void main(String[] args) {   JButton testButton = new JButton("Test Button");  testButton.addActionListener(new ActionListener(){  @ O v e r r i d e p u b l i c v o i d actionPerformed(ActionEvent ae){  System.out.println("Click Detected by Anon Class");  } });   testButton.addActionListener(e -> System.out.println("Click Detected by Lambda Listner"));  // Swing stuff  JFrame frame = new JFrame("Listener Test");  frame.setDefaultCloseOperation(JFrame. EXIT_ON_CLOSE);  frame.add(testButton, BorderLayout. CENTER);  frame.pack();  frame.setVisible(true);   } }

代码 4

根据示例,Lambda表达式作为一个参数被传递,“targettyping”可用于如下的上下文中:

  • 变量声明
  • 分配
  • 返回语句
  • 数组初始化
  • 方法或构造器参数
  • Lambda表达式方法体
  • 条件表达式
  • Cast表达式

在ApacheIgnite中使用Lambda函数上面我们简单介绍了一下Lambda的基本用法,而在Apache开源项目ApacheIgnite上也支持Lambda方法的使用。在ApacheIgnite的分布式网格中,创建GridInstance实例,grid通过Broadcast(广播)的方式来执行GridRunnable方法中的操作,如代码5所示。

try (Grid grid = GridGain.start()) {  grid.compute().broadcast((GridRunnable)() ->  System.out.println("Hello World")). get(); }

代码 5

在ApacheIgnite中使用时空索引ApacheIgnite能够轻松地使用带有内存索引的SQL语句查询内存中的数据,并且能够扩展SQL到时空查询。例如,代码6的查询将找到地图中的特定平方区域内所有的点:

Polygon square = factory.createPolygon(new Coordinate[] {  new Coordinate(0, 0),  new Coordinate(0, 100),  new Coordinate(100, 100),  new Coordinate(100, 0),  new Coordinate(0, 0) }); cache.queries().  createSqlQuery(MapPoint.class, "select * from MapPoint where location && ?").  queryArguments(square).  execute().get(); }

代码 6

什么是 JavaStreaming?

Java8Stream是元素的集合,这点让 Stream看起来用些类似Iterator,可以支持顺序和并行的对原Stream进行汇聚的操作。

大家可以把 Stream当成一个高级版本的Iterator。用户只要给出需要对其包含的元素执行什么操作,比如“过滤掉长度大于10的字符串”、“获取每个字符串的首字母”等,具体这些操作如何应用到每个元素上。Java8的并行流处理特性也是一大创新亮点,能够让你更简单地使用JavaAPI进行更复杂的数据处理查询,进而优化传统的数据处理查询的性能。

Stream接口和Collect接口

Java8并行流处理类位于 java.util.stream包中。主要的流处理接口有两个:Stream接口和Collect接口。Stream接口定义了典型的流处理应该具备的基本操作,如filter方法、map方法和reduce方法,这三个方法是函数式编程的标志。另外,Stream接口还定义了创建Stream的static方法,Stream或者是有限的、或者是无限的,其中无限的Stream通过Iterate()来实现。

  • IntStream、 LongStream和DoubleStream接口。上述接口是基于原生类型int、long和double的Stream,提供了很多关于流的操作方法。例如,sum()、min()/max()、average()方法等,这些方法都是汇聚过程中的基本操作。
  • Collect接口。 Collect接口是Reduce类的抽象接口,提供了很多常用的Reduce()操作方法。其中Reduce()操作有并行和串行两种实现方式,BaseStream接口中的parallel()、sequential()、unordered()方法,大多数应用程序都使用parallel()方法进行并行流处理操作。
  • Collectors类。 Collectors接口提供了很多可以直接使用的Reduce操作方法。比较典型的方法,例如GroupingBy、PartitioningBy操作。它们都可以通过parallel()、sequential()来执行,例如groupingByConcurrent会使用Parallel()进行grouping操作。
  • StreamSupport类。 StreamSupport类提供了一些底层用于操作Stream的方法,如果不需要创建自己的Stream,一般不需要使用它。JavaStream处理示例代码7是使用JavaStream并行处理和Lambda闭包函数的特性来计算证券投资中资产组合的信用风险,CodeA使用传统的循环迭代方法进行计算,而CodeB使用IntStream调用parallel()方法来执行Integer数据类型的并行流处理操作。
Code A double[] losses = new double[num];  for (int i = 0; i < num; i++)  {  for (Credit crd : portfolio)  {  int remDays = Math.min(crd. getRemainingTerm(), horizon);  if (rndGen.nextDouble() >= 1 - crd.getDefaultProbability(remDays))  losses[i] += (1 + crd. getAnnualRate() * Math.min(horizon, crd. getRemainingTerm()) / 365)  * c r d . getRemainingAmount();  else  losses[i] -= crd. getAnnualRate() * Math.min(horizon, crd. getRemainingTerm()) / 365 *  c r d . getRemainingAmount();  }  }
Code B IntStream.range(0, num).parallel().forEach(i ->  {  for (Credit crd : portfolio)  {  int remDays = Math.min(crd. getRemainingTerm(), horizon);  if (rndGen.nextDouble() >= 1 - crd.getDefaultProbability(remDays))  losses[i] += (1 + crd. getAnnualRate() * Math.min(horizon, crd. getRemainingTerm()) / 365)  * c r d . getRemainingAmount();  else  losses[i] -= crd. getAnnualRate() * Math.min(horizon, crd. getRemainingTerm()) / 365 *  c r d . getRemainingAmount();  }  }  );

代码 7

CodeB中,我们通过parallel()并行执行任务操作,利用多核操作的优势,将获得更多的并发优势。从编程角度上看减少了大量冗余代码,同时也提升了代码执行的效率。ApacheIgnite特性介绍ApacheIgnite目前是Apache组织孵化器下的一个开源项目,由GridGain公司领导,是一个高性能、集成度高的分布式内存平台,能够实时地对大量数据集进行计算和事务处理,同时在最新版本中引入了对Java8Lambda和Streaming的支持。它将数据缓存在内存当中,因此比传统关系型数据库的处理速度更快。

主要功能特性如下:

  • 分布式内存缓存(兼容 JSR-107)
  • 数据操作速度超快
  • 弹性扩展
  • 分布式事务
  • 分布式 SQL查询(兼容SQL-99)
  • JDBC驱动
  • 分布式内存队列和其他数据结构
  • 分层 Off-Heap存储
  • WebSeesion集群
  • HibernateL2缓存集成
  • 兼容 Memcached
  • 基于 HTTP的RESTAPI访问
  • Java8Lambda函数
  • Java8Streaming

Java8发展展望

Java从诞生到现在为止经历了 20年的漫长发展历程,Java固有的编程模式看似不可能会有太大的变化,然而Java8的横空出世使这一情况有了极大改观,JavaLambda和Stream两大特性为垂垂老矣的Java平台注入了强大的活力,更加符合Java开发者们的热切期望,让他能够更自信地迎接云计算和移动应用时代的到来。目前在开源世界中支持Java8新特性的应用框架和软件平台逐渐增多(例如ApacheIgnite工程),相信未来会有越来越多的Java应用迁移到Java8平台上,充分享受Java8新特性带来的舒适体验感。

作者简介: 杨旭钧多年的银行、保险行业的金融系统的规划、设计和开发经验,参与过多个国内外银行、互联网电商与移动支付领域项目。曾就职于 VMware和Accenture两家公司,服务客户包括花旗银行、中国外汇交易中心、中国央行、中国银行等。目前正在创办一家提供大数据分析处理服务的公司。

本文选自程序员电子版2015年5月B刊,该期更多文章请查看 这里 。2000年创刊至今所有文章目录请查看 程序员封面秀 。欢迎 订阅程序员电子版 (含iPad版、Android版、PDF版)。

正文到此结束
Loading...