本文档旨在分析Lucene如何把业务信息写到磁盘上的大致流程,并不涉及Document中每个Field如何存储(该部分放在另外一篇wiki中介绍)。
Directory dire = NIOFSDirectory.open(FileSystems.getDefault().getPath(indexDirectory)); IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); iwc.setRAMBufferSizeMB(64); //兆默认刷 indexWriter = new IndexWriter(dire, iwc); Document doc = createDocument(artiste, skuId); indexWriter.addDocument(doc); indexWriter.commit(); indexWriter.close();
NIOFSDirectory.open()
如果是64位JRE会得到MMapDirectory(采用内存映射的方式写索引数据到File中)。
IndexWriterConfig //properties this.analyzer = analyzer; ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;//默认超过16M就会触发flush磁盘操作 maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;//默认按照RAM空间大小触发flush maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;// mergedSegmentWarmer = null; delPolicy = new KeepOnlyLastCommitDeletionPolicy();//删除策略 commit = null; useCompoundFile = IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM; openMode = OpenMode.CREATE_OR_APPEND;//IndexWriter打开模式 similarity = IndexSearcher.getDefaultSimilarity();//相似度计算,一般初始化Searcher的时候会用(因为只有查询的时候才会用到相似度计算) mergeScheduler = new ConcurrentMergeScheduler();//每个segement的merge交个一个线程完成 writeLockTimeout = IndexWriterConfig.WRITE_LOCK_TIMEOUT;//写操作遇到锁超时时间 indexingChain = DocumentsWriterPerThread.defaultIndexingChain; codec = Codec.getDefault(); if (codec == null) { throw new NullPointerException(); } infoStream = InfoStream.getDefault(); mergePolicy = new TieredMergePolicy();//合并策略 flushPolicy = new FlushByRamOrCountsPolicy();//flush策略 readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);//并发写索引线程池 perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
可以对IndexWriter做一些属性配置,IndexWriterConfig里面有非常丰富的各种配置。
这个步骤比较简单,主要是将业务字段组装成一个Document。一个Document由多个Field组成的。
每个Filed一般有四个属性组成:
添加一个Document,其实调用的是updateDocument。而Lucene更新Document不像Mysql可以直接更新某一条记录,所以只能先删除这条记录(Document),然后再添加上这条Document。下面参数Term,是一个检索条件,满足条件的Document做更新。
public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException { ensureOpen(); try { boolean success = false; try { if (docWriter.updateDocument(doc, analyzer, term)) { processEvents(true, false); } success = true; } finally { if (!success) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "hit exception updating document"); } } } } catch (AbortingException | OutOfMemoryError tragedy) { tragicEvent(tragedy, "updateDocument"); } }
这里从下面几个角度阐述下为什么Lucene不能直接更新一个Document?
所以上面添加一个Document,最后演变成了更新一个Document。并且updateDocument包含两个串行操作
(1)先检索,如果有满足条件的Document,则删除
(2)如果没有满足条件的Document,则直接添加到内存中
在看docWriter.updateDocument(doc, analyzer, term)代码之前,我们先看几个Lucene子建的类,下面着重分析下:
Lucene内部实现的一个DocumentsWriterPerThread池(并不是严格意义的线程池),主要是
实现DocumentsWriterPerThread的重用(准确来说是实现ThreadState的重用)。该类可以简单理解一个线程池。
/*{@link ThreadState} references and guards a * {@link DocumentsWriterPerThread} instance that is used during indexing to build a in-memory index segment. */ final static class ThreadState extends ReentrantLock { DocumentsWriterPerThread dwpt; // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl long bytesUsed = 0; // guarded by Reentrant lock private boolean isActive = true; ThreadState(DocumentsWriterPerThread dpwt) { this.dwpt = dpwt; }
本质是个读写锁,用来配合DocumentsWriterPerThread来完成对一个Document的写操作。
简单理解成一个Document的写线程。线程池保证了DocumentsWriterPerThread的重用。
控制DocumentsWriterPerThread完成index过程中flush操作
刷新策略
理解了ThreadState这个类应该就简单了,甚至可以直接把该类看做带读写锁控制的写线程。其实是ThreadState内部引用DocumentWriterPerThread实例。在线程池初始化的时候就创建了8个ThreadState(这个时候并没有初始化,意思是DocumentWriterPerThread并没有新建起来,而是延迟初始化具体线程)。后面就尽量重用这个8个ThreadState。
DocumentsWriterPerThreadPool(int maxNumThreadStates) {//默认maxNumThreadStates=8 if (maxNumThreadStates < 1) { throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates); } threadStates = new ThreadState[maxNumThreadStates]; numThreadStatesActive = 0; for (int i = 0; i < threadStates.length; i++) { threadStates[i] = new ThreadState(null); } freeList = new ThreadState[maxNumThreadStates]; }
好了,看完了几个基础类,回到上面updateDocument最关键的是这一行。
docWriter.updateDocument(doc, analyzer, term) boolean updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, final Term delTerm) throws IOException, AbortingException { boolean hasEvents = preUpdate(); final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; try { if (!perThread.isActive()) { ensureOpen(); assert false: "perThread is not active but we are still open"; } ensureInitialized(perThread);//真正初始化单个具体线程DocumentsWriterPerThread assert perThread.isInitialized(); final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { dwpt.updateDocument(doc, analyzer, delTerm); //DocumentsWriterPerThread线程真正更新文档 } catch (AbortingException ae) { flushControl.doOnAbort(perThread); dwpt.abort(); throw ae; } finally { // We don't know whether the document actually // counted as being indexed, so we must subtract here to // accumulate our separate counter: numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); } final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); } finally { perThreadPool.release(perThread);//将该线程重新放回到线程池中,释放掉资源 } return postUpdate(flushingDWPT, hasEvents); }
从线程池中获取一个ThreadState
ThreadState obtainAndLock() { final ThreadState perThread = perThreadPool.getAndLock(Thread .currentThread(), documentsWriter);//从线程池中拿取一个ThreadState boolean success = false; try { if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: addFlushableState(perThread); } success = true; // simply return the ThreadState even in a flush all case sine we already hold the lock return perThread; } finally { if (!success) { // make sure we unlock if this fails perThreadPool.release(perThread); } } }
该Document的更新交给一个DocumentsWriterPerThread之后,我们再往下看。
public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { testPoint("DocumentsWriterPerThread addDocument start"); assert deleteQueue != null; reserveOneDoc(); docState.doc = doc; docState.analyzer = analyzer; docState.docID = numDocsInRAM; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); } // Even on exception, the document is still added (but marked // deleted), so we don't need to un-reserve at that point. // Aborting exceptions will actually "lose" more than one // document, so the counter will be "wrong" in that case, but // it's very hard to fix (we can't easily distinguish aborting // vs non-aborting exceptions): boolean success = false; try { try { consumer.processDocument(); } finally { docState.clear(); } success = true; } finally { if (!success) { // mark document as deleted deleteDocID(docState.docID); numDocsInRAM++; } } finishDocument(delTerm); }
该线程里面我们只关心一行代码
consumer.processDocument();
从这里差不多就豁然开朗了,一切最后该Document的处理是交给了一个DocConsumer来处理。而这个DocConsumer的获取见下:
abstract
DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread)
throws
IOException;
Lucene实现了一个默认的DocConsumer即:DefaultIndexingChain。 那接下来就看该DocConsumer是如何处理该Document的了就行了。
@Override public void processDocument() throws IOException, AbortingException { // How many indexed field names we've seen (collapses // multiple field instances by the same name): int fieldCount = 0; long fieldGen = nextFieldGen++; // NOTE: we need two passes here, in case there are // multi-valued fields, because we must process all // instances of a given field at once, since the // analyzer is free to reuse TokenStream across fields // (i.e., we cannot have more than one TokenStream // running "at once"): termsHash.startDocument(); fillStoredFields(docState.docID); startStoredFields(); boolean aborting = false; try { for (IndexableField field : docState.doc) {//挨个遍历每个Field做处理,哈哈,终于露出可爱的尾巴了 fieldCount = processField(field, fieldGen, fieldCount); } } catch (AbortingException ae) { aborting = true; throw ae; } finally { if (aborting == false) { // Finish each indexed field name seen in the document: for (int i=0;i<fieldCount;i++) { fields[i].finish(); } finishStoredFields(); } } try { termsHash.finishDocument(); } catch (Throwable th) { // Must abort, on the possibility that on-disk term // vectors are now corrupt: throw AbortingException.wrap(th); } }
看到上面代码,我笑了。哈哈,越来越清晰,有没有。对该Document的处理,无非就是演化成遍历每个Field,对Field做处理就行了。但是具体Field怎么处理,该wiki不涉及,放到另外一篇wiki中深入记录(参考: Document存储细节 )。
indexWriter.commit();
提交Commit完成如下工作:
sync操作具体的解释可参考如下一段解释:
传统的UNIX实现在内核中设有缓冲区高速缓存或页面高速缓存,大多数磁盘I/O都通过缓冲进行。当将数据写入文件时,内核通常先将该数据复制到其中一个缓冲区中,如果该缓冲区尚未写满,则并不将其排入输出队列,而是等待其写满或者当内核需要重用该缓冲区以便存放其他磁盘块数据时,再将该缓冲排入输出队列,然后待其到达队首时,才进行实际的I/O操作。这种输出方式被称为延迟写(delayed write)(Bach [1986]第3章详细讨论了缓冲区高速缓存)。 延迟写减少了磁盘读写次数,但是却降低了文件内容的更新速度,使得欲写到文件中的数据在一段时间内并没有写到磁盘上。当系统发生故障时,这种延迟可能造成文件更新内容的丢失。为了保证磁盘上实际文件系统与缓冲区高速缓存中内容的一致性,UNIX系统提供了sync、fsync和fdatasync三个函数。 sync函数只是将所有修改过的块缓冲区排入写队列,然后就返回,它并不等待实际写磁盘操作结束。 通常称为update的系统守护进程会周期性地(一般每隔30秒)调用sync函数。这就保证了定期冲洗内核的块缓冲区。命令sync(1)也调用sync函数。 fsync函数只对由文件描述符filedes指定的单一文件起作用,并且等待写磁盘操作结束,然后返回。fsync可用于数据库这样的应用程序,这种应用程序需要确保将修改过的块立即写到磁盘上。 fdatasync函数类似于fsync,但它只影响文件的数据部分。而除数据外,fsync还会同步更新文件的属性。 对于提供事务支持的数据库,在事务提交时,都要确保事务日志(包含该事务所有的修改操作以及一个提交记录)完全写到硬盘上,才认定事务提交成功并返回给应用层。
看完这段解释就能明白,sync操作就是将文件系统(甚至内核)中的缓存数据都刷新到disk上面,保证数据的安全性(OS挂掉,断电,数据不会丢失)。
那具体Lucene做了些什么呢?
private final void commitInternal(MergePolicy mergePolicy) throws IOException { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: start"); } synchronized(commitLock) { ensureOpen(false); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: enter lock"); } if (pendingCommit == null) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: now prepare"); } prepareCommitInternal(mergePolicy);//最关键的一行 } else { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: already prepared"); } } finishCommit(); } }
走到prepareCommitInternal里面就是详细的刷新操作,索引刷新操作放在另外一篇wiki中介绍。
刷新数据,关闭资源。往里走,逻辑还是很丰富的。等flush详细讲完之后,再回头看这部分。