Directory dire = NIOFSDirectory.open(FileSystems.getDefault().getPath(indexDirectory)); IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer()); iwc.setRAMBufferSizeMB(64); //兆默认刷 indexWriter = new IndexWriter(dire, iwc); Document doc = createDocument(artiste, skuId); indexWriter.addDocument(doc); indexWriter.commit(); indexWriter.close();
IndexWriterConfig //properties this.analyzer = analyzer; ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;//默认超过16M就会触发flush磁盘操作 maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;//默认按照RAM空间大小触发flush maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;// mergedSegmentWarmer = null; delPolicy = new KeepOnlyLastCommitDeletionPolicy();//删除策略 commit = null; useCompoundFile = IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM; openMode = OpenMode.CREATE_OR_APPEND;//IndexWriter打开模式 similarity = IndexSearcher.getDefaultSimilarity();//相似度计算,一般初始化Searcher的时候会用(因为只有查询的时候才会用到相似度计算) mergeScheduler = new ConcurrentMergeScheduler();//每个segement的merge交个一个线程完成 writeLockTimeout = IndexWriterConfig.WRITE_LOCK_TIMEOUT;//写操作遇到锁超时时间 indexingChain = DocumentsWriterPerThread.defaultIndexingChain; codec = Codec.getDefault(); if (codec == null) { throw new NullPointerException(); } infoStream = InfoStream.getDefault(); mergePolicy = new TieredMergePolicy();//合并策略 flushPolicy = new FlushByRamOrCountsPolicy();//flush策略 readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);//并发写索引线程池 perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException { ensureOpen(); try { boolean success = false; try { if (docWriter.updateDocument(doc, analyzer, term)) { processEvents(true, false); } success = true; } finally { if (!success) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "hit exception updating document"); } } } } catch (AbortingException | OutOfMemoryError tragedy) { tragicEvent(tragedy, "updateDocument"); } }
在看docWriter.updateDocument(doc, analyzer, term)代码之前,我们先看几个Lucene子建的类,下面着重分析下:
/*{@link ThreadState} references and guards a * {@link DocumentsWriterPerThread} instance that is used during indexing to build a in-memory index segment. */ final static class ThreadState extends ReentrantLock { DocumentsWriterPerThread dwpt; // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl long bytesUsed = 0; // guarded by Reentrant lock private boolean isActive = true; ThreadState(DocumentsWriterPerThread dpwt) { this.dwpt = dpwt; }
DocumentsWriterPerThreadPool(int maxNumThreadStates) {//默认maxNumThreadStates=8 if (maxNumThreadStates < 1) { throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates); } threadStates = new ThreadState[maxNumThreadStates]; numThreadStatesActive = 0; for (int i = 0; i < threadStates.length; i++) { threadStates[i] = new ThreadState(null); } freeList = new ThreadState[maxNumThreadStates]; }
docWriter.updateDocument(doc, analyzer, term) boolean updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, final Term delTerm) throws IOException, AbortingException { boolean hasEvents = preUpdate(); final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; try { if (!perThread.isActive()) { ensureOpen(); assert false: "perThread is not active but we are still open"; } ensureInitialized(perThread);//真正初始化单个具体线程DocumentsWriterPerThread assert perThread.isInitialized(); final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { dwpt.updateDocument(doc, analyzer, delTerm); //DocumentsWriterPerThread线程真正更新文档 } catch (AbortingException ae) { flushControl.doOnAbort(perThread); dwpt.abort(); throw ae; } finally { // We don't know whether the document actually // counted as being indexed, so we must subtract here to // accumulate our separate counter: numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); } final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); } finally { perThreadPool.release(perThread);//将该线程重新放回到线程池中,释放掉资源 } return postUpdate(flushingDWPT, hasEvents); }
ThreadState obtainAndLock() { final ThreadState perThread = perThreadPool.getAndLock(Thread .currentThread(), documentsWriter);//从线程池中拿取一个ThreadState boolean success = false; try { if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for // another DWPT: addFlushableState(perThread); } success = true; // simply return the ThreadState even in a flush all case sine we already hold the lock return perThread; } finally { if (!success) { // make sure we unlock if this fails perThreadPool.release(perThread); } } }
public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { testPoint("DocumentsWriterPerThread addDocument start"); assert deleteQueue != null; reserveOneDoc(); docState.doc = doc; docState.analyzer = analyzer; docState.docID = numDocsInRAM; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); } // Even on exception, the document is still added (but marked // deleted), so we don't need to un-reserve at that point. // Aborting exceptions will actually "lose" more than one // document, so the counter will be "wrong" in that case, but // it's very hard to fix (we can't easily distinguish aborting // vs non-aborting exceptions): boolean success = false; try { try { consumer.processDocument(); } finally { docState.clear(); } success = true; } finally { if (!success) { // mark document as deleted deleteDocID(docState.docID); numDocsInRAM++; } } finishDocument(delTerm); }
DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread)
Lucene实现了一个默认的DocConsumer即:DefaultIndexingChain。 那接下来就看该DocConsumer是如何处理该Document的了就行了。
@Override public void processDocument() throws IOException, AbortingException { // How many indexed field names we've seen (collapses // multiple field instances by the same name): int fieldCount = 0; long fieldGen = nextFieldGen++; // NOTE: we need two passes here, in case there are // multi-valued fields, because we must process all // instances of a given field at once, since the // analyzer is free to reuse TokenStream across fields // (i.e., we cannot have more than one TokenStream // running "at once"): termsHash.startDocument(); fillStoredFields(docState.docID); startStoredFields(); boolean aborting = false; try { for (IndexableField field : docState.doc) {//挨个遍历每个Field做处理,哈哈,终于露出可爱的尾巴了 fieldCount = processField(field, fieldGen, fieldCount); } } catch (AbortingException ae) { aborting = true; throw ae; } finally { if (aborting == false) { // Finish each indexed field name seen in the document: for (int i=0;i<fieldCount;i++) { fields[i].finish(); } finishStoredFields(); } } try { termsHash.finishDocument(); } catch (Throwable th) { // Must abort, on the possibility that on-disk term // vectors are now corrupt: throw AbortingException.wrap(th); } }
看到上面代码,我笑了。哈哈,越来越清晰,有没有。对该Document的处理,无非就是演化成遍历每个Field,对Field做处理就行了。但是具体Field怎么处理,该wiki不涉及,放到另外一篇wiki中深入记录(参考: Document存储细节 )。
传统的UNIX实现在内核中设有缓冲区高速缓存或页面高速缓存,大多数磁盘I/O都通过缓冲进行。当将数据写入文件时,内核通常先将该数据复制到其中一个缓冲区中,如果该缓冲区尚未写满,则并不将其排入输出队列,而是等待其写满或者当内核需要重用该缓冲区以便存放其他磁盘块数据时,再将该缓冲排入输出队列,然后待其到达队首时,才进行实际的I/O操作。这种输出方式被称为延迟写(delayed write)(Bach [1986]第3章详细讨论了缓冲区高速缓存)。 延迟写减少了磁盘读写次数,但是却降低了文件内容的更新速度,使得欲写到文件中的数据在一段时间内并没有写到磁盘上。当系统发生故障时,这种延迟可能造成文件更新内容的丢失。为了保证磁盘上实际文件系统与缓冲区高速缓存中内容的一致性,UNIX系统提供了sync、fsync和fdatasync三个函数。 sync函数只是将所有修改过的块缓冲区排入写队列,然后就返回,它并不等待实际写磁盘操作结束。 通常称为update的系统守护进程会周期性地(一般每隔30秒)调用sync函数。这就保证了定期冲洗内核的块缓冲区。命令sync(1)也调用sync函数。 fsync函数只对由文件描述符filedes指定的单一文件起作用,并且等待写磁盘操作结束,然后返回。fsync可用于数据库这样的应用程序,这种应用程序需要确保将修改过的块立即写到磁盘上。 fdatasync函数类似于fsync,但它只影响文件的数据部分。而除数据外,fsync还会同步更新文件的属性。 对于提供事务支持的数据库,在事务提交时,都要确保事务日志(包含该事务所有的修改操作以及一个提交记录)完全写到硬盘上,才认定事务提交成功并返回给应用层。
private final void commitInternal(MergePolicy mergePolicy) throws IOException { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: start"); } synchronized(commitLock) { ensureOpen(false); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: enter lock"); } if (pendingCommit == null) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: now prepare"); } prepareCommitInternal(mergePolicy);//最关键的一行 } else { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "commit: already prepared"); } } finishCommit(); } }