默认采用的是org.activiti.engine.impl.db.DbIdGenerator
public class DbIdGenerator implements IdGenerator { protected int idBlockSize; protected long nextId = 0; protected long lastId = -1; protected CommandExecutor commandExecutor; protected CommandConfig commandConfig; public synchronized String getNextId() { if (lastId<nextId) { getNewBlock(); } long _nextId = nextId++; return Long.toString(_nextId); } protected synchronized void getNewBlock() { IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize)); this.nextId = idBlock.getNextId(); this.lastId = idBlock.getLastId(); } public int getIdBlockSize() { return idBlockSize; } public void setIdBlockSize(int idBlockSize) { this.idBlockSize = idBlockSize; } public CommandExecutor getCommandExecutor() { return commandExecutor; } public void setCommandExecutor(CommandExecutor commandExecutor) { this.commandExecutor = commandExecutor; } public CommandConfig getCommandConfig() { return commandConfig; } public void setCommandConfig(CommandConfig commandConfig) { this.commandConfig = commandConfig; } }
每次从数据库中取出一段
public class GetNextIdBlockCmd implements Command<IdBlock> { private static final long serialVersionUID = 1L; protected int idBlockSize; public GetNextIdBlockCmd(int idBlockSize) { this.idBlockSize = idBlockSize; } public IdBlock execute(CommandContext commandContext) { PropertyEntity property = (PropertyEntity) commandContext .getPropertyEntityManager() .findPropertyById("next.dbid"); long oldValue = Long.parseLong(property.getValue()); long newValue = oldValue+idBlockSize; property.setValue(Long.toString(newValue)); return new IdBlock(oldValue, newValue-1); } }
在CommandContextInterceptor里头,拦截了命令
public class CommandContextInterceptor extends AbstractCommandInterceptor { private static final Logger log = LoggerFactory.getLogger(CommandContextInterceptor.class); protected CommandContextFactory commandContextFactory; protected ProcessEngineConfigurationImpl processEngineConfiguration; public CommandContextInterceptor() { } public CommandContextInterceptor(CommandContextFactory commandContextFactory, ProcessEngineConfigurationImpl processEngineConfiguration) { this.commandContextFactory = commandContextFactory; this.processEngineConfiguration = processEngineConfiguration; } public <T> T execute(CommandConfig config, Command<T> command) { CommandContext context = Context.getCommandContext(); boolean contextReused = false; // We need to check the exception, because the transaction can be in a rollback state, // and some other command is being fired to compensate (eg. decrementing job retries) if (!config.isContextReusePossible() || context == null || context.getException() != null) { context = commandContextFactory.createCommandContext(command); } else { log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName()); contextReused = true; } try { // Push on stack Context.setCommandContext(context); Context.setProcessEngineConfiguration(processEngineConfiguration); return next.execute(config, command); } catch (Exception e) { context.exception(e); } finally { try { if (!contextReused) { context.close(); } } finally { // Pop from stack Context.removeCommandContext(); Context.removeProcessEngineConfiguration(); Context.removeBpmnOverrideContext(); } } return null; } public CommandContextFactory getCommandContextFactory() { return commandContextFactory; } public void setCommandContextFactory(CommandContextFactory commandContextFactory) { this.commandContextFactory = commandContextFactory; } public ProcessEngineConfigurationImpl getProcessEngineConfiguration() { return processEngineConfiguration; } public void setProcessEngineContext(ProcessEngineConfigurationImpl processEngineContext) { this.processEngineConfiguration = processEngineContext; }
里头的finally里头,有个context.close方法
public void close() { // the intention of this method is that all resources are closed properly, even // if exceptions occur in close or flush methods of the sessions or the // transaction context. try { try { try { if (exception == null && closeListeners != null) { try { for (CommandContextCloseListener listener : closeListeners) { listener.closing(this); } } catch (Throwable exception) { exception(exception); } } if (exception == null) { flushSessions(); } } catch (Throwable exception) { exception(exception); } finally { try { if (exception == null) { transactionContext.commit(); } } catch (Throwable exception) { exception(exception); } if (exception == null && closeListeners != null) { try { for (CommandContextCloseListener listener : closeListeners) { listener.closed(this); } } catch (Throwable exception) { exception(exception); } } if (exception != null) { if (exception instanceof JobNotFoundException || exception instanceof ActivitiTaskAlreadyClaimedException) { // reduce log level, because this may have been caused because of job deletion due to cancelActiviti="true" log.info("Error while closing command context", exception); } else if (exception instanceof ActivitiOptimisticLockingException) { // reduce log level, as normally we're not interested in logging this exception log.debug("Optimistic locking exception : " + exception); } else { log.debug("Error while closing command context", exception); } transactionContext.rollback(); } } } catch (Throwable exception) { exception(exception); } finally { closeSessions(); } } catch (Throwable exception) { exception(exception); } // rethrow the original exception if there was one if (exception != null) { if (exception instanceof Error) { throw (Error) exception; } else if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else { throw new ActivitiException("exception while executing command " + command, exception); } } }
该方法会去flushSessions
public void flush() { List<DeleteOperation> removedOperations = removeUnnecessaryOperations(); flushDeserializedObjects(); List<PersistentObject> updatedObjects = getUpdatedObjects(); if (log.isDebugEnabled()) { Collection<List<PersistentObject>> insertedObjectLists = insertedObjects.values(); int nrOfInserts = 0, nrOfUpdates = 0, nrOfDeletes = 0; for (List<PersistentObject> insertedObjectList: insertedObjectLists) { for (PersistentObject insertedObject : insertedObjectList) { log.debug(" insert {}", insertedObject); nrOfInserts++; } } for (PersistentObject updatedObject: updatedObjects) { log.debug(" update {}", updatedObject); nrOfUpdates++; } for (DeleteOperation deleteOperation: deleteOperations) { log.debug(" {}", deleteOperation); nrOfDeletes++; } log.debug("flush summary: {} insert, {} update, {} delete.", nrOfInserts, nrOfUpdates, nrOfDeletes); log.debug("now executing flush..."); } flushInserts(); flushUpdates(updatedObjects); flushDeletes(removedOperations); }
会去flushUpdates。因此如果在高并发的场景下,可能一个线程读取一段block之后,还没有来得及update,已经被另一线程读取,造成id已经被占用的情况。为解决高并发的问题,可以采用uuid策略。
org.activiti.engine.impl.persistence.StrongUuidGenerator
public class StrongUuidGenerator implements IdGenerator { // different ProcessEngines on the same classloader share one generator. protected static TimeBasedGenerator timeBasedGenerator; public StrongUuidGenerator() { ensureGeneratorInitialized(); } protected void ensureGeneratorInitialized() { if (timeBasedGenerator == null) { synchronized (StrongUuidGenerator.class) { if (timeBasedGenerator == null) { timeBasedGenerator = Generators.timeBasedGenerator(EthernetAddress.fromInterface()); } } } } public String getNextId() { return timeBasedGenerator.generate().toString(); } }
采用的是com.fasterxml.uuid.impl.TimeBasedGenerator
UUID id generator for high concurrency