├── pom.xml └── src ├── main │ ├── java │ │ └── com │ │ └── dalong │ │ ├── Application.java │ │ │ └── resources │ └── mybooks │ └── books-------------
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dalong</groupId> <artifactId>myid</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <encoding>UTF-8</encoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.hazelcast.jet</groupId> <artifactId>hazelcast-jet</artifactId> <version>4.1</version> </dependency> </dependencies> </project>
demo app rong fengliang rong demo app rong rong
package com.dalong; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.Traversers; import com.hazelcast.jet.aggregate.AggregateOperations; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.Sources; import com.hazelcast.jet.pipeline.WindowDefinition; public class Application { public static void main(String[] args) { String path = "src/main/resources/mybooks"; // batch mode BatchMode(path); // stream mode // StreamMode(path); } // private static void StreamMode(String path) { JetInstance jet = Jet.bootstrappedInstance(); Pipeline p = Pipeline.create(); p.readFrom(Sources.fileWatcher(path)) .withIngestionTimestamps() .setLocalParallelism(1) .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(word -> word) .window(WindowDefinition.tumbling(4)) .aggregate(AggregateOperations.counting()) .writeTo(Sinks.logger()); jet.newJob(p).join(); } private static void BatchMode(String path) { JetInstance jet = Jet.bootstrappedInstance(); Pipeline p = Pipeline.create(); p.readFrom(Sources.files(path)) .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(word -> word) .aggregate(AggregateOperations.counting()) .writeTo(Sinks.logger()); jet.newJob(p).join(); } }
14:18:02.150 [ INFO] [c.h.j.Jet] Bootstrapped instance requested but application wasn't called from jet submit script. Creating a standalone Jet instance instead. 14:18:02.278 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-default.yaml' from the classpath. 14:18:02.299 [ INFO] [c.h.j.c.JetConfig] jet.home is /Users/dalong/Downloads/myid 14:18:02.761 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-member-default.yaml' from the classpath. 14:18:03.017 [ INFO] [c.h.i.AddressPicker] Prefer IPv4 stack is true, prefer IPv6 addresses is false 14:18:03.078 [ INFO] [c.h.i.AddressPicker] Picked [10.6.205.88]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true 14:18:03.186 [ INFO] [c.h.system] Hazelcast Jet 4.2 (20200714 - 0e841ef) starting at [10.6.205.88]:5701 14:18:03.188 [ INFO] [c.h.system] Cluster name: jet 14:18:03.189 [ INFO] [c.h.system] o o o o---o o---o o o---o o o---o o-o-o o o---o o-o-o | | / \ / | | | / \ | | | | | o---o o---o o o-o | o o---o o---o | | o-o | | | | | / | | | | | | | \ | | | o o o o o---o o---o o---o o---o o o o---o o o--o o---o o 14:18:03.191 [ INFO] [c.h.system] Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. 14:18:04.425 [ INFO] [c.h.s.i.o.i.BackpressureRegulator] Backpressure is disabled 14:18:05.926 [ WARN] [c.h.c.CPSubsystem] CP Subsystem is not enabled. CP data structures will operate in UNSAFE mode! Please note that UNSAFE mode will not provide strong consistency guarantees. 14:18:07.140 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.cooperative.min.microseconds=25µs,jet.idle.cooperative.max.microseconds=500µs 14:18:07.143 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.noncooperative.min.microseconds=25µs,jet.idle.noncooperative.max.microseconds=5000µs 14:18:07.373 [ INFO] [c.h.j.i.JetService] Setting number of cooperative threads and default parallelism to 8 14:18:07.386 [ INFO] [c.h.s.i.o.i.OperationExecutorImpl] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks) 14:18:07.391 [ INFO] [c.h.i.d.Diagnostics] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments. 14:18:07.416 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTING 14:18:07.487 [ WARN] [c.h.i.i.Node] No join method is enabled! Starting standalone. 14:18:07.534 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTED 14:18:22.713 [ INFO] [c.h.i.p.i.PartitionStateManager] Initializing cluster partition table arrangement... 14:18:22.910 [ INFO] [c.h.j.i.JobCoordinationService] Starting job 04a8-86cc-9300-0001 based on submit request 14:18:22.930 [ INFO] [c.h.j.i.MasterJobContext] Didn't find any snapshot to restore for job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 14:18:22.931 [ INFO] [c.h.j.i.MasterJobContext] Start executing job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001, execution graph in DOT format: digraph DAG { "filesSource(src/main/resources/mybooks/*)" [localParallelism=4]; "fused(flat-map, filter)" [localParallelism=8]; "group-and-aggregate-prepare" [localParallelism=8]; "group-and-aggregate" [localParallelism=8]; "loggerSink" [localParallelism=1]; "filesSource(src/main/resources/mybooks/*)" -> "fused(flat-map, filter)" [queueSize=1024]; "fused(flat-map, filter)" -> "group-and-aggregate-prepare" [label="partitioned", queueSize=1024]; subgraph cluster_0 { "group-and-aggregate-prepare" -> "group-and-aggregate" [label="distributed-partitioned", queueSize=1024]; } "group-and-aggregate" -> "loggerSink" [queueSize=1024]; } HINT: You can use graphviz or http://viz-js.com to visualize the printed graph. 14:18:23.050 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=04a8-86cc-9300-0001, jobName='04a8-86cc-9300-0001', executionId=04a8-86cc-9301-0001 initialized 14:18:23.056 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 from coordinator [10.6.205.88]:5701 14:18:23.113 [ INFO] [c.h.j.i.c.W.loggerSink#0] rong=4 14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] fengliang=1 14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] app=2 14:18:23.114 [ INFO] [c.h.j.i.c.W.loggerSink#0] demo=2 14:18:23.121 [ INFO] [c.h.j.i.MasterJobContext] Execution of job '04a8-86cc-9300-0001', execution 04a8-86cc-9301-0001 completed successfully Start time: 2020-07-15T14:18:22.912 Duration: 208 ms To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
public static void main(String[] args) { String path = "src/main/resources/mybooks"; // batch mode // BatchMode(path); // stream mode StreamMode(path); }
14:59:23.648 [ INFO] [c.h.j.Jet] Bootstrapped instance requested but application wasn't called from jet submit script. Creating a standalone Jet instance instead. 14:59:23.733 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-default.yaml' from the classpath. 14:59:23.737 [ INFO] [c.h.j.c.JetConfig] jet.home is /Users/dalong/Downloads/myid 14:59:24.023 [ INFO] [c.h.i.c.AbstractConfigLocator] Loading 'hazelcast-jet-member-default.yaml' from the classpath. 14:59:24.116 [ INFO] [c.h.i.AddressPicker] Prefer IPv4 stack is true, prefer IPv6 addresses is false 14:59:24.156 [ INFO] [c.h.i.AddressPicker] Picked [10.6.205.88]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true 14:59:24.203 [ INFO] [c.h.system] Hazelcast Jet 4.2 (20200714 - 0e841ef) starting at [10.6.205.88]:5701 14:59:24.204 [ INFO] [c.h.system] Cluster name: jet 14:59:24.205 [ INFO] [c.h.system] o o o o---o o---o o o---o o o---o o-o-o o o---o o-o-o | | / \ / | | | / \ | | | | | o---o o---o o o-o | o o---o o---o | | o-o | | | | | / | | | | | | | \ | | | o o o o o---o o---o o---o o---o o o o---o o o--o o---o o 14:59:24.205 [ INFO] [c.h.system] Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. 14:59:24.523 [ INFO] [c.h.s.i.o.i.BackpressureRegulator] Backpressure is disabled 14:59:24.830 [ WARN] [c.h.c.CPSubsystem] CP Subsystem is not enabled. CP data structures will operate in UNSAFE mode! Please note that UNSAFE mode will not provide strong consistency guarantees. 14:59:25.218 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.cooperative.min.microseconds=25µs,jet.idle.cooperative.max.microseconds=500µs 14:59:25.219 [ INFO] [c.h.j.i.e.TaskletExecutionService] Creating idler with jet.idle.noncooperative.min.microseconds=25µs,jet.idle.noncooperative.max.microseconds=5000µs 14:59:25.281 [ INFO] [c.h.j.i.JetService] Setting number of cooperative threads and default parallelism to 8 14:59:25.285 [ INFO] [c.h.s.i.o.i.OperationExecutorImpl] Starting 8 partition threads and 5 generic threads (1 dedicated for priority tasks) 14:59:25.287 [ INFO] [c.h.i.d.Diagnostics] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments. 14:59:25.297 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTING 14:59:25.322 [ WARN] [c.h.i.i.Node] No join method is enabled! Starting standalone. 14:59:25.336 [ INFO] [c.h.c.LifecycleService] [10.6.205.88]:5701 is STARTED 14:59:25.562 [ INFO] [c.h.i.p.i.PartitionStateManager] Initializing cluster partition table arrangement... 14:59:25.685 [ INFO] [c.h.j.i.JobCoordinationService] Starting job 04a8-9031-ad80-0001 based on submit request 14:59:25.700 [ INFO] [c.h.j.i.MasterJobContext] Didn't find any snapshot to restore for job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001 14:59:25.700 [ INFO] [c.h.j.i.MasterJobContext] Start executing job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001, execution graph in DOT format: digraph DAG { "fileWatcherSource(src/main/resources/mybooks/*)" [localParallelism=1]; "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" [localParallelism=1]; "fused(flat-map, filter)" [localParallelism=8]; "sliding-window-prepare" [localParallelism=8]; "sliding-window" [localParallelism=8]; "loggerSink" [localParallelism=1]; "fileWatcherSource(src/main/resources/mybooks/*)" -> "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" [label="isolated", queueSize=1024]; "fileWatcherSource(src/main/resources/mybooks/*)-add-timestamps" -> "fused(flat-map, filter)" [queueSize=1024]; "fused(flat-map, filter)" -> "sliding-window-prepare" [label="partitioned", queueSize=1024]; subgraph cluster_0 { "sliding-window-prepare" -> "sliding-window" [label="distributed-partitioned", queueSize=1024]; } "sliding-window" -> "loggerSink" [queueSize=1024]; } HINT: You can use graphviz or http://viz-js.com to visualize the printed graph. 14:59:25.838 [ INFO] [c.h.j.i.JobExecutionService] Execution plan for jobId=04a8-9031-ad80-0001, jobName='04a8-9031-ad80-0001', executionId=04a8-9031-ad81-0001 initialized 14:59:25.856 [ INFO] [c.h.j.i.JobExecutionService] Start execution of job '04a8-9031-ad80-0001', execution 04a8-9031-ad81-0001 from coordinator [10.6.205.88]:5701 14:59:25.935 [ INFO] [c.h.j.i.c.S.fileWatcherSource(src/main/resources/mybooks/*)#0] Started to watch directory: src/main/resources/mybooks 14:59:41.981 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='rong', value='21', isEarly=false} 14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='fengliang', value='3', isEarly=false} 14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='app', value='7', isEarly=false} 14:59:41.982 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='emo', value='3', isEarly=false} 14:59:41.983 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='dmo', value='3', isEarly=false} 14:59:41.983 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=14:59:41.936, end=14:59:41.940, key='demo', value='22', isEarly=false}