本文主要研究一下storm的submitTopology
2018-10-08 17:32:55.738 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Generated ZooKeeper secret payload for MD5-digest: -8659577410336375158:-6351873438041855318 2018-10-08 17:32:55.893 INFO 2870 --- [ main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627 2018-10-08 17:32:56.059 INFO 2870 --- [ main] o.apache.storm.security.auth.AuthUtils : Got AutoCreds [] 2018-10-08 17:32:56.073 INFO 2870 --- [ main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627 2018-10-08 17:32:56.123 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Uploading dependencies - jars... 2018-10-08 17:32:56.125 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Uploading dependencies - artifacts... 2018-10-08 17:32:56.125 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Dependency Blob keys - jars : [] / artifacts : [] 2018-10-08 17:32:56.149 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Uploading topology jar /tmp/storm-demo/target/storm-demo-0.0.1-SNAPSHOT.jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar 2018-10-08 17:32:57.105 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Successfully uploaded topology jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar 2018-10-08 17:32:57.106 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Submitting topology DemoTopology in distributed mode with conf {"nimbus.seeds":["192.168.99.100"],"storm.zookeeper.topology.auth.scheme":"digest","topology.workers":1,"storm.zookeeper.port":2181,"nimbus.thrift.port":6627,"storm.zookeeper.topology.auth.payload":"-8659577410336375158:-6351873438041855318","storm.zookeeper.servers":["192.168.99.100"]} 2018-10-08 17:32:58.008 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Finished submitting topology: DemoTopology
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { submitTopology(name, stormConf, topology, null, null); } public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { submitTopologyAs(name, stormConf, topology, opts, progressListener, null); } public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); stormConf.putAll(prepareZookeeperAuthentication(conf)); validateConfs(conf, topology); Map<String,String> passedCreds = new HashMap<>(); if (opts != null) { Credentials tmpCreds = opts.get_creds(); if (tmpCreds != null) { passedCreds = tmpCreds.get_creds(); } } Map<String,String> fullCreds = populateCredentials(conf, passedCreds); if (!fullCreds.isEmpty()) { if (opts == null) { opts = new SubmitOptions(TopologyInitialStatus.ACTIVE); } opts.set_creds(new Credentials(fullCreds)); } try { if (localNimbus!=null) { LOG.info("Submitting topology " + name + " in local mode"); if (opts!=null) { localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts); } else { // this is for backwards compatibility localNimbus.submitTopology(name, stormConf, topology); } LOG.info("Finished submitting topology: " + name); } else { String serConf = JSONValue.toJSONString(stormConf); try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) { if (topologyNameExists(name, client)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } // Dependency uploading only makes sense for distributed mode List<String> jarsBlobKeys = Collections.emptyList(); List<String> artifactsBlobKeys; DependencyUploader uploader = new DependencyUploader(); try { uploader.init(); jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader); artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader); } catch (Throwable e) { // remove uploaded jars blobs, not artifacts since they're shared across the cluster uploader.deleteBlobs(jarsBlobKeys); uploader.shutdown(); throw e; } try { setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys); submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { // remove uploaded jars blobs, not artifacts since they're shared across the cluster // Note that we don't handle TException to delete jars blobs // because it's safer to leave some blobs instead of topology not running uploader.deleteBlobs(jarsBlobKeys); throw e; } finally { uploader.shutdown(); } } } } catch(TException e) { throw new RuntimeException(e); } invokeSubmitterHook(name, asUser, conf, topology); } private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser, Map conf, String serConf, NimbusClient client) throws TException { try { String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client); LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf); if (opts != null) { client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, jar, serConf, topology); } LOG.info("Finished submitting topology: {}", name); } catch (InvalidTopologyException e) { LOG.warn("Topology submission exception: {}", e.get_msg()); throw e; } catch (AlreadyAliveException e) { LOG.warn("Topology already alive exception", e); throw e; } } public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) { if (localJar == null) { throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); } try { String uploadLocation = client.getClient().beginFileUpload(); LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation); BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES); long totalSize = new File(localJar).length(); if (listener != null) { listener.onStart(localJar, uploadLocation, totalSize); } long bytesUploaded = 0; while(true) { byte[] toSubmit = is.read(); bytesUploaded += toSubmit.length; if (listener != null) { listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize); } if(toSubmit.length==0) break; client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); } client.getClient().finishFileUpload(uploadLocation); if (listener != null) { listener.onCompleted(localJar, uploadLocation, totalSize); } LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation); return uploadLocation; } catch(Exception e) { throw new RuntimeException(e); } }
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) { LOG.info("Uploading dependencies - jars..."); DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser(); String depJarsProp = System.getProperty("storm.dependency.jars", ""); List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp); try { return uploader.uploadFiles(depJars, true); } catch (Throwable e) { throw new RuntimeException(e); } }
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) { LOG.info("Uploading dependencies - artifacts..."); DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser(); String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}"); Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp); try { return uploader.uploadArtifacts(depArtifacts); } catch (Throwable e) { throw new RuntimeException(e); } }
storm-core-1.1.0-sources.jar!/org/apache/storm/dependency/DependencyUploader.java
public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException { checkFilesExist(dependencies); List<String> keys = new ArrayList<>(dependencies.size()); try { for (File dependency : dependencies) { String fileName = dependency.getName(); String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName)); try { uploadDependencyToBlobStore(key, dependency); } catch (KeyAlreadyExistsException e) { // it should never happened since we apply UUID throw new RuntimeException(e); } keys.add(key); } } catch (Throwable e) { if (getBlobStore() != null && cleanupIfFails) { deleteBlobs(keys); } throw new RuntimeException(e); } return keys; } public List<String> uploadArtifacts(Map<String, File> artifacts) { checkFilesExist(artifacts.values()); List<String> keys = new ArrayList<>(artifacts.size()); try { for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) { String artifact = artifactToFile.getKey(); File dependency = artifactToFile.getValue(); String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact)); try { uploadDependencyToBlobStore(key, dependency); } catch (KeyAlreadyExistsException e) { // we lose the race, but it doesn't matter } keys.add(key); } } catch (Throwable e) { throw new RuntimeException(e); } return keys; } private boolean uploadDependencyToBlobStore(String key, File dependency) throws KeyAlreadyExistsException, AuthorizationException, IOException { boolean uploadNew = false; try { // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved // as a workaround, we call getBlobMeta() for all keys getBlobStore().getBlobMeta(key); } catch (KeyNotFoundException e) { // TODO: do we want to add ACL here? AtomicOutputStream blob = getBlobStore() .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>())); Files.copy(dependency.toPath(), blob); blob.close(); uploadNew = true; } return uploadNew; }
storm-core-1.1.0-sources.jar!/org/apache/storm/blobstore/NimbusBlobStore.java
public class NimbusUploadAtomicOutputStream extends AtomicOutputStream { private String session; private int maxChunkSize = 4096; private String key; public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) { this.session = session; this.maxChunkSize = bufferSize; this.key = key; } @Override public void cancel() throws IOException { try { synchronized(client) { client.getClient().cancelBlobUpload(session); } } catch (TException e) { throw new RuntimeException(e); } } @Override public void write(int b) throws IOException { try { synchronized(client) { client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b})); } } catch (TException e) { throw new RuntimeException(e); } } @Override public void write(byte []b) throws IOException { write(b, 0, b.length); } @Override public void write(byte []b, int offset, int len) throws IOException { try { int end = offset + len; for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) { int realLen = Math.min(end - realOffset, maxChunkSize); LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset)); synchronized(client) { client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen)); } } } catch (TException e) { throw new RuntimeException(e); } } @Override public void close() throws IOException { try { synchronized(client) { client.getClient().finishBlobUpload(session); client.getClient().createStateInZookeeper(key); } } catch (TException e) { throw new RuntimeException(e); } } }
storm-core-1.1.0-sources.jar!/org/apache/storm/generated/Nimbus.java
public String beginFileUpload() throws AuthorizationException, org.apache.thrift.TException { send_beginFileUpload(); return recv_beginFileUpload(); } public void send_beginFileUpload() throws org.apache.thrift.TException { beginFileUpload_args args = new beginFileUpload_args(); sendBase("beginFileUpload", args); } public String recv_beginFileUpload() throws AuthorizationException, org.apache.thrift.TException { beginFileUpload_result result = new beginFileUpload_result(); receiveBase(result, "beginFileUpload"); if (result.is_set_success()) { return result.success; } if (result.aze != null) { throw result.aze; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result"); } public void send_finishFileUpload(String location) throws org.apache.thrift.TException { finishFileUpload_args args = new finishFileUpload_args(); args.set_location(location); sendBase("finishFileUpload", args); } public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException { send_uploadChunk(location, chunk); recv_uploadChunk(); } public void send_uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift.TException { uploadChunk_args args = new uploadChunk_args(); args.set_location(location); args.set_chunk(chunk); sendBase("uploadChunk", args); } public void recv_uploadChunk() throws AuthorizationException, org.apache.thrift.TException { uploadChunk_result result = new uploadChunk_result(); receiveBase(result, "uploadChunk"); if (result.aze != null) { throw result.aze; } return; } public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException { send_submitTopology(name, uploadedJarLocation, jsonConf, topology); recv_submitTopology(); } public void send_submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws org.apache.thrift.TException { submitTopology_args args = new submitTopology_args(); args.set_name(name); args.set_uploadedJarLocation(uploadedJarLocation); args.set_jsonConf(jsonConf); args.set_topology(topology); sendBase("submitTopology", args); } public void recv_submitTopology() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException { submitTopology_result result = new submitTopology_result(); receiveBase(result, "submitTopology"); if (result.e != null) { throw result.e; } if (result.ite != null) { throw result.ite; } if (result.aze != null) { throw result.aze; } return; } public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException { send_uploadBlobChunk(session, chunk); recv_uploadBlobChunk(); } public void send_uploadBlobChunk(String session, ByteBuffer chunk) throws org.apache.thrift.TException { uploadBlobChunk_args args = new uploadBlobChunk_args(); args.set_session(session); args.set_chunk(chunk); sendBase("uploadBlobChunk", args); } public void recv_uploadBlobChunk() throws AuthorizationException, org.apache.thrift.TException { uploadBlobChunk_result result = new uploadBlobChunk_result(); receiveBase(result, "uploadBlobChunk"); if (result.aze != null) { throw result.aze; } return; }
storm的submitTopology会先上传storm.dependency.jars指定的依赖jar,再上传storm.dependency.artifacts指定的依赖,最后再上传指定的jar包,他们都是通过远程方法sendBase发送数据以及receiveBase接收数据。