Login | Register For Free | Help
Search for: (Advanced)

Mailing List Archive: Lucene: Java-User

Lucene index write performance optimization

 

 

Lucene java-user RSS feed   Index | Next | Previous | View Threaded


jamie at stimulussoft

Nov 10, 2009, 8:43 AM

Post #1 of 4 (383 views)
Permalink
Lucene index write performance optimization

Hi There

Our app spends alot of time waiting for Lucene to finish writing to the
index. I'd like to minimize this. If you have a moment to spare, please
let me know if my LuceneIndex class presented below can be improved upon.

It is used in the following way:

luceneIndex = new
LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
exitReq,volume.getID()+"
indexer",volume.getIndexPath(),

Config.getConfig().getIndex().getMaxSimultaneousDocs());
Document doc = new Document();
IndexInfo indexInfo = new IndexInfo(doc);
luceneIndex.indexDocument(indexInfo);

As an aside note, is there any way for Lucene to support simultaneous
writes to an index? For example, each write threads could write to a
separate shard, after a period the shared could be merged into a single
index? Or is this overkill? I am interested hear the opinion of the
Lucene experts.

Thanks in advance

Jamie

package com.stimulus.archiva.index;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;

public class LuceneIndex extends Thread {

protected ArrayBlockingQueue<LuceneDocument> queue;
protected static final Log logger =
LogFactory.getLog(LuceneIndex.class.getName());
protected static final Log indexLog =
LogFactory.getLog("indexlog");
IndexWriter writer = null;
protected static ScheduledExecutorService scheduler;
protected static ScheduledFuture<?> scheduledTask;
protected LuceneDocument EXIT_REQ = null;
ReentrantLock indexLock = new ReentrantLock();
ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
File indexLogFile;
PrintStream indexLogOut;
IndexProcessor indexProcessor;
String friendlyName;
String indexPath;
int maxSimultaneousDocs;

public LuceneIndex(int queueSize, LuceneDocument exitReq,
String friendlyName, String indexPath,
int maxSimultaneousDocs) {
this.queue = new
ArrayBlockingQueue<LuceneDocument>(queueSize);
this.EXIT_REQ = exitReq;
this.friendlyName = friendlyName;
this.indexPath = indexPath;
this.maxSimultaneousDocs = maxSimultaneousDocs;
setLog(friendlyName);
}


public int getMaxSimultaneousDocs() {
return maxSimultaneousDocs;
}

public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
this.maxSimultaneousDocs = maxSimultaneousDocs;
}


public ReentrantLock getIndexLock() {
return indexLock;
}

protected void setLog(String logName) {

try {
indexLogFile = getIndexLogFile(logName);
if (indexLogFile!=null) {
if (indexLogFile.length()>10485760)
indexLogFile.delete();
indexLogOut = new PrintStream(indexLogFile);
}
logger.debug("set index log file path
{path='"+indexLogFile.getCanonicalPath()+"'}");
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
}
}

protected File getIndexLogFile(String logName) {
try {
String logfilepath =
Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
return new File(logfilepath);
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
return null;
}
}



protected void openIndex() throws MessageSearchException {
Exception lastError = null;

if (writer==null) {
logger.debug("openIndex() index "+friendlyName+" will be
opened. it is currently closed.");
} else {
logger.debug("openIndex() did not bother opening index
"+friendlyName+". it is already open.");
return;
}
logger.debug("opening index "+friendlyName+" for write");
logger.debug("opening search index "+friendlyName+" for
write {indexpath='"+indexPath+"'}");
boolean writelock;
int attempt = 0;
int maxattempt = 10;

if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
maxattempt = 10000;
} else {
maxattempt = 10;
}

do {
writelock = false;
try {
FSDirectory fsDirectory =
FSDirectory.getDirectory(indexPath);
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(fsDirectory,analyzer,new
IndexWriter.MaxFieldLength(maxIndexChars));
if (indexLog.isDebugEnabled() &&
indexLogOut!=null) {
writer.setInfoStream(indexLogOut);
}
} catch (LockObtainFailedException lobfe) {
logger.debug("write lock on index
"+friendlyName+". will reopen in 50ms.");
try { Thread.sleep(50); } catch (Exception e) {}
attempt++;
writelock = true;
} catch (CorruptIndexException cie) {
throw new MessageSearchException("index
"+friendlyName+" appears to be corrupt. please reindex the active
volume."+cie.getMessage(),logger);
} catch (Throwable io) {
throw new MessageSearchException("failed to write
document to index "+friendlyName+":"+io.getMessage(),logger);
}
} while (writelock && attempt<maxattempt);
if (attempt>=10000)
throw new MessageSearchException("failed to open index
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
}

public void indexDocument(LuceneDocument luceneDocument) throws
MessageSearchException {
logger.debug("index document {"+luceneDocument+"}");
long s = (new Date()).getTime();
if (luceneDocument == null)
throw new MessageSearchException("assertion failure:
null document",logger);
try {
queue.put(luceneDocument);
} catch (InterruptedException ie) {
throw new MessageSearchException("failed to add document
to queue:"+ie.getMessage(),ie,logger);
}
logger.debug("document indexed successfully
{"+luceneDocument+"}");

logger.debug("indexing message end {"+luceneDocument+"}");
long e = (new Date()).getTime();
logger.debug("indexing time {time='"+(e-s)+"'}");
}

public class IndexProcessor extends Thread {

public IndexProcessor() {
setName("index processor");
}

public void run() {
boolean exit = false;
//ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any
major performance benefit
LuceneDocument luceneDocument = null;
LinkedList<LuceneDocument> pushbacks = new
LinkedList<LuceneDocument>();

while (!exit) {

try {
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
luceneDocument = null;
luceneDocument = (LuceneDocument) queue.take();

indexLock.lock();

if (luceneDocument==EXIT_REQ) {
logger.debug("index exit req received.
exiting");
exit = true;
continue;
}


try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
if (luceneDocument==null) {
logger.debug("index info is null");
}
int i = 0;
while(luceneDocument!=null &&
i<maxSimultaneousDocs) {
try {
Document doc = luceneDocument.getDocument();
String language = doc.get("lang");
if (language==null) {
language =
Config.getConfig().getIndex().getIndexLanguage();
}

writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(luceneDocument);
break;
}
//documentPool.execute(new
IndexDocument(luceneDocument,pushbacks));

i++;
if (i<maxSimultaneousDocs) {
luceneDocument = (LuceneDocument)
queue.poll();

if (luceneDocument==null) {
logger.debug("index info is null");
}

if (luceneDocument==EXIT_REQ) {
logger.debug("index exit req
received. exiting (2)");
exit = true;
break;
}
}

}
if (pushbacks.size()>0) {
closeIndex();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
for (LuceneDocument pushback : pushbacks) {
try {

writer.addDocument(pushback.getDocument());
} catch (IOException io) {
logger.error("failed to add
document to index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(pushback);
}
//documentPool.execute(new
IndexDocument(pushback,pushbacks));
i++;
}
}

//documentPool.shutdown();

//documentPool.awaitTermination(30,TimeUnit.MINUTES);

} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage());
} finally {
closeIndex();
indexLock.unlock();
}
}
}

public class IndexDocument extends Thread {

LuceneDocument luceneDocument = null;
List<LuceneDocument> pushbacks = null;

public IndexDocument(LuceneDocument
luceneDocument,List<LuceneDocument> pushbacks) {
this.luceneDocument = luceneDocument;
this.pushbacks = pushbacks;
setName("index document");
}

public void run() {
try {

writer.addDocument(luceneDocument.getDocument());
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(luceneDocument);
} catch (Throwable t) {
logger.error("failed to add document to
index:"+t.getMessage(),t);
}
}};
}

protected void closeIndex() {
try {
indexLock.lock();
if (writer!=null) {
writer.close();
}
} catch (Throwable io) {
logger.error("failed to close index
writer:"+io.getMessage(),io);
} finally {
logger.debug("writer closed");
writer = null;
indexLock.unlock();
}
}

public void deleteIndex() throws MessageSearchException {
logger.debug("delete index {indexpath='"+indexPath+"'}");
try {
indexLock.lock();
try {
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new
IndexWriter.MaxFieldLength(maxIndexChars));
} catch (Throwable cie) {
logger.error("failed to delete index
{index='"+indexPath+"'}",cie);
return;
}
MessageIndex.volumeIndexes.remove(this);
} finally {
closeIndex();
indexLock.unlock();
}
}

public void startup() {
logger.debug("volumeindex is starting up");
File lockFile = new File(indexPath+File.separatorChar +
"write.lock");
if (lockFile.exists()) {
logger.warn("The server lock file already exists. Either
another indexer is running or the server was not shutdown correctly.");
logger.warn("If it is the latter, the lock file must be
manually deleted at "+lockFile.getAbsolutePath());
if
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
logger.debug("index lock file detected on
volumeindex startup.");
} else {
logger.warn("index lock file detected. the server
was shutdown incorrectly. automatically deleting lock file.");
logger.warn("indexer is configured to deal with only
one indexer process.");
logger.warn("if you are running more than one
indexer, your index could be subject to corruption.");
lockFile.delete();
}
}
indexProcessor = new IndexProcessor();
indexProcessor.start();
Runtime.getRuntime().addShutdownHook(this);

}

public void shutdown() {
logger.debug("volumeindex is shutting down");
queue.add(EXIT_REQ);
scheduler.shutdownNow();

}

@Override
public void run() {
queue.add(EXIT_REQ);
}


public interface LuceneDocument {

public String toString();
public Document getDocument();
public void finalize();

}

}






---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe [at] lucene
For additional commands, e-mail: java-user-help [at] lucene


glen.newton at gmail

Nov 10, 2009, 8:54 AM

Post #2 of 4 (369 views)
Permalink
Re: Lucene index write performance optimization [In reply to]

You might try re-implementing, using ThreadPoolExecutor
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html

glen

2009/11/10 Jamie Band <jamie [at] stimulussoft>:
> Hi There
>
> Our app spends alot of time waiting for Lucene to finish writing to the
> index. I'd like to minimize this. If you have a moment to spare, please let
> me know if my LuceneIndex class presented below can be improved upon.
>
> It is used in the following way:
>
> luceneIndex = new
> LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
>                                           exitReq,volume.getID()+"
> indexer",volume.getIndexPath(),
>
> Config.getConfig().getIndex().getMaxSimultaneousDocs());
> Document doc = new Document();
> IndexInfo indexInfo = new IndexInfo(doc);
> luceneIndex.indexDocument(indexInfo);
>
> As an aside note, is there any way for Lucene to support simultaneous writes
> to an index? For example, each write threads could write to a separate
> shard, after a period the shared could be merged into a single index? Or is
> this overkill? I am interested hear the opinion of the Lucene experts.
>
> Thanks in advance
>
> Jamie
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import java.util.*;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class LuceneIndex extends Thread {
>          protected ArrayBlockingQueue<LuceneDocument> queue;
>        protected static final Log logger =
> LogFactory.getLog(LuceneIndex.class.getName());
>        protected static final Log indexLog = LogFactory.getLog("indexlog");
>           IndexWriter writer = null;
>           protected static ScheduledExecutorService scheduler;
>        protected static ScheduledFuture<?> scheduledTask;
>        protected LuceneDocument EXIT_REQ = null;
>        ReentrantLock indexLock = new ReentrantLock();
>        ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>        File indexLogFile;
>        PrintStream indexLogOut;
>        IndexProcessor indexProcessor;
>        String friendlyName;
>        String indexPath;
>        int maxSimultaneousDocs;
>                   public LuceneIndex(int queueSize, LuceneDocument exitReq,
>                                String friendlyName, String indexPath, int
>  maxSimultaneousDocs) {
>               this.queue = new
> ArrayBlockingQueue<LuceneDocument>(queueSize);
>               this.EXIT_REQ = exitReq;
>               this.friendlyName = friendlyName;
>               this.indexPath = indexPath;
>               this.maxSimultaneousDocs = maxSimultaneousDocs;
>               setLog(friendlyName);
>           }
>                             public int getMaxSimultaneousDocs() {
>             return maxSimultaneousDocs;
>         }
>                 public void setMaxSimultaneousDocs(int maxSimultaneousDocs)
> {
>             this.maxSimultaneousDocs = maxSimultaneousDocs;
>         }
>                             public ReentrantLock getIndexLock() {
>             return indexLock;
>         }
>             protected void setLog(String logName) {
>
>               try {
>                   indexLogFile = getIndexLogFile(logName);
>                   if (indexLogFile!=null) {
>                       if (indexLogFile.length()>10485760)
>                           indexLogFile.delete();
>                       indexLogOut = new PrintStream(indexLogFile);
>                   }
>                   logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
>               } catch (Exception e) {
>                   logger.error("failed to open index log
> file:"+e.getMessage(),e);
>               }
>         }
>               protected File getIndexLogFile(String logName) {
>              try {
>                   String logfilepath =
> Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
>                   return new File(logfilepath);
>               } catch (Exception e) {
>                   logger.error("failed to open index log
> file:"+e.getMessage(),e);
>                   return null;
>               }
>         }
>                             protected void openIndex() throws
> MessageSearchException {
>           Exception lastError = null;
>                     if (writer==null) {
>               logger.debug("openIndex() index "+friendlyName+" will be
> opened. it is currently closed.");
>           } else {
>               logger.debug("openIndex() did not bother opening index
> "+friendlyName+". it is already open.");
>               return;
>           }
>           logger.debug("opening index "+friendlyName+" for write");
>           logger.debug("opening search index "+friendlyName+" for write
> {indexpath='"+indexPath+"'}");
>           boolean writelock;
>           int attempt = 0;
>           int maxattempt = 10;
>                     if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>               maxattempt = 10000;
>            } else {
>               maxattempt = 10;
>            }
>                     do {
>               writelock = false;
>               try {
>                       FSDirectory fsDirectory =
> FSDirectory.getDirectory(indexPath);
>                       int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                       writer = new IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                       if (indexLog.isDebugEnabled() && indexLogOut!=null) {
>                           writer.setInfoStream(indexLogOut);
>                       }
>               } catch (LockObtainFailedException lobfe) {
>                       logger.debug("write lock on index "+friendlyName+".
> will reopen in 50ms.");
>                       try { Thread.sleep(50); } catch (Exception e) {}
>                       attempt++;
>                       writelock = true;
>               } catch (CorruptIndexException cie) {
>                   throw new MessageSearchException("index "+friendlyName+"
> appears to be corrupt. please reindex the active
> volume."+cie.getMessage(),logger);
>               } catch (Throwable io) {
>                   throw new MessageSearchException("failed to write document
> to index "+friendlyName+":"+io.getMessage(),logger);
>               }
>          } while (writelock && attempt<maxattempt);
>          if (attempt>=10000)
>            throw new MessageSearchException("failed to open index
> "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
>       }
>             public void indexDocument(LuceneDocument luceneDocument) throws
> MessageSearchException {
>           logger.debug("index document {"+luceneDocument+"}");
>           long s = (new Date()).getTime();
>           if (luceneDocument == null)
>               throw new MessageSearchException("assertion failure: null
> document",logger);
>           try {
>               queue.put(luceneDocument);
>           } catch (InterruptedException ie) {
>               throw new MessageSearchException("failed to add document to
> queue:"+ie.getMessage(),ie,logger);
>           }
>           logger.debug("document indexed successfully
> {"+luceneDocument+"}");
>                     logger.debug("indexing message end
> {"+luceneDocument+"}");
>           long e = (new Date()).getTime();
>           logger.debug("indexing time {time='"+(e-s)+"'}");
>       }
>         public class IndexProcessor extends Thread {
>                     public IndexProcessor() {
>               setName("index processor");
>           }
>               public void run() {
>               boolean exit = false;
>               //ExecutorService documentPool;
>               // we abandoned pool as it does not seem to offer any major
> performance benefit
>               LuceneDocument luceneDocument = null;
> LinkedList<LuceneDocument> pushbacks = new LinkedList<LuceneDocument>();
>                             while (!exit) {
>                                     try {
>                       //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>                       luceneDocument = null;
> luceneDocument = (LuceneDocument) queue.take();
>                                             indexLock.lock();
>                                             if (luceneDocument==EXIT_REQ) {
>                           logger.debug("index exit req received. exiting");
>                           exit = true;
>                           continue;
>                       }
>                                                       try {
>                            openIndex();
>                       } catch (Exception e) {
>                            logger.error("failed to open
> index:"+e.getMessage(),e);
>                            return;
>                       }
>                       if (luceneDocument==null) {
>                           logger.debug("index info is null");
>                       }
>                       int i = 0;
>                       while(luceneDocument!=null && i<maxSimultaneousDocs) {
>                           try {
>                               Document doc = luceneDocument.getDocument();
>                               String language = doc.get("lang");
>                               if (language==null) {
>                                   language =
> Config.getConfig().getIndex().getIndexLanguage();
>                               }
>
> writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
>                           } catch (IOException io) {
>                               logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                           } catch (AlreadyClosedException e) {
>                               pushbacks.add(luceneDocument);
>                               break;
>                           }
>                             //documentPool.execute(new
> IndexDocument(luceneDocument,pushbacks));
>                                                      i++;
>                            if (i<maxSimultaneousDocs) {
>                                luceneDocument = (LuceneDocument)
> queue.poll();
>                                                                if
> (luceneDocument==null) {
>                                       logger.debug("index info is null");
>                                }
>                                                                if
> (luceneDocument==EXIT_REQ) {
>                                        logger.debug("index exit req
> received. exiting (2)");
>                                       exit = true;
>                                       break;
>                                 }
>                            }
>                                                   }
>                       if (pushbacks.size()>0) {
>                             closeIndex();
>                             try {
>                                    openIndex();
>                             } catch (Exception e) {
>                                logger.error("failed to open
> index:"+e.getMessage(),e);
>                                return;
>                             }
>                             for (LuceneDocument pushback : pushbacks) {
>                                   try {
>
> writer.addDocument(pushback.getDocument());
>                                   } catch (IOException io) {
>                                       logger.error("failed to add document
> to index:"+io.getMessage(),io);
>                                   } catch (AlreadyClosedException e) {
>                                       pushbacks.add(pushback);
>                                   }
>                                   //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
>                                   i++;
>                             }
>                       }
>                                             //documentPool.shutdown();
>                       //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>                                          } catch (Throwable ie) {
>                        logger.error("index write
> interrupted:"+ie.getMessage());
>                    } finally {
>                          closeIndex();
>                         indexLock.unlock();
>                   }
>               }                }
>                     public class IndexDocument extends Thread {
>                                 LuceneDocument luceneDocument = null;
>                   List<LuceneDocument> pushbacks = null;
>                                     public IndexDocument(LuceneDocument
> luceneDocument,List<LuceneDocument> pushbacks) {
>                       this.luceneDocument = luceneDocument;
>                       this.pushbacks = pushbacks;
>                       setName("index document");
>                   }
>                                 public void run() {
>                       try {
>                           writer.addDocument(luceneDocument.getDocument());
>                       } catch (IOException io) {
>                           logger.error("failed to add document to
> index:"+io.getMessage(),io);
>                       } catch (AlreadyClosedException e) {
>                           pushbacks.add(luceneDocument);
>                       } catch (Throwable t) {
>                           logger.error("failed to add document to
> index:"+t.getMessage(),t);
>                       }
>                   }};
>           }
>           protected void closeIndex() {
>            try {
>                indexLock.lock();
>                if (writer!=null) {
>                   writer.close();
>                }
>            } catch (Throwable io) {
>               logger.error("failed to close index
> writer:"+io.getMessage(),io);
>            } finally {
>                logger.debug("writer closed");
>                writer = null;
>                indexLock.unlock();
>            }
>       }
>           public void deleteIndex() throws MessageSearchException {
>                  logger.debug("delete index {indexpath='"+indexPath+"'}");
>                  try {
>                     indexLock.lock();
>                    try {
>                       int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>                       writer = new
> IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
>                    } catch (Throwable cie) {
>                        logger.error("failed to delete index
> {index='"+indexPath+"'}",cie);
>                        return;
>                    }
>                    MessageIndex.volumeIndexes.remove(this);
>                 } finally {
>                       closeIndex();
>                     indexLock.unlock();
>               }
>         }
>               public void startup() {
>           logger.debug("volumeindex is starting up");
>           File lockFile = new File(indexPath+File.separatorChar +
> "write.lock");
>           if (lockFile.exists()) {
>               logger.warn("The server lock file already exists. Either
> another indexer is running or the server was not shutdown correctly.");
>               logger.warn("If it is the latter, the lock file must be
> manually deleted at "+lockFile.getAbsolutePath());
>               if (Config.getConfig().getIndex().getMultipleIndexProcesses())
> {
>                   logger.debug("index lock file detected on volumeindex
> startup.");
>               } else {
>                   logger.warn("index lock file detected. the server was
> shutdown incorrectly. automatically deleting lock file.");
>                   logger.warn("indexer is configured to deal with only one
> indexer process.");
>                   logger.warn("if you are running more than one indexer,
> your index could be subject to corruption.");
>                   lockFile.delete();
>               }
>           }
>           indexProcessor = new IndexProcessor();
>           indexProcessor.start();
>           Runtime.getRuntime().addShutdownHook(this);
>                   }
>                 public void shutdown() {
>             logger.debug("volumeindex is shutting down");
>             queue.add(EXIT_REQ);
>             scheduler.shutdownNow();
>                   }
>                 @Override
>         public void run() {
>             queue.add(EXIT_REQ);
>         }
>                         public interface LuceneDocument {
>                         public String toString();
>             public Document getDocument();
>             public void finalize();
>                     }
>    }
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe [at] lucene
> For additional commands, e-mail: java-user-help [at] lucene
>
>



--

-


yonik at lucidimagination

Nov 10, 2009, 8:57 AM

Post #3 of 4 (373 views)
Permalink
Re: Lucene index write performance optimization [In reply to]

On Tue, Nov 10, 2009 at 11:43 AM, Jamie Band <jamie [at] stimulussoft> wrote:
> As an aside note, is there any way for Lucene to support simultaneous writes
> to an index?

The indexing process is highly parallelized... just use multiple
threads to add documents to the same IndexWriter.

-Yonik
http://www.lucidimagination.com

---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe [at] lucene
For additional commands, e-mail: java-user-help [at] lucene


otis_gospodnetic at yahoo

Nov 10, 2009, 8:17 PM

Post #4 of 4 (357 views)
Permalink
Re: Lucene index write performance optimization [In reply to]

This is what we have in Lucene in Action 2:

~/lia2$ ff \*Thread\*java
./src/lia/admin/CreateThreadedIndexTask.java
./src/lia/admin/ThreadedIndexWriter.java

Otis
--
Sematext is hiring -- http://sematext.com/about/jobs.html?mls
Lucene, Solr, Nutch, Katta, Hadoop, HBase, UIMA, NLP, NER, IR



----- Original Message ----
> From: Jamie Band <jamie [at] stimulussoft>
> To: java-user [at] lucene
> Sent: Tue, November 10, 2009 11:43:30 AM
> Subject: Lucene index write performance optimization
>
> Hi There
>
> Our app spends alot of time waiting for Lucene to finish writing to the index.
> I'd like to minimize this. If you have a moment to spare, please let me know if
> my LuceneIndex class presented below can be improved upon.
>
> It is used in the following way:
>
> luceneIndex = new LuceneIndex(Config.getConfig().getIndex().getIndexBacklog(),
> exitReq,volume.getID()+"
> indexer",volume.getIndexPath(),
>
> Config.getConfig().getIndex().getMaxSimultaneousDocs());
> Document doc = new Document();
> IndexInfo indexInfo = new IndexInfo(doc);
> luceneIndex.indexDocument(indexInfo);
>
> As an aside note, is there any way for Lucene to support simultaneous writes to
> an index? For example, each write threads could write to a separate shard, after
> a period the shared could be merged into a single index? Or is this overkill? I
> am interested hear the opinion of the Lucene experts.
>
> Thanks in advance
>
> Jamie
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import java.util.*;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class LuceneIndex extends Thread {
> protected ArrayBlockingQueuequeue;
> protected static final Log logger =
> LogFactory.getLog(LuceneIndex.class.getName());
> protected static final Log indexLog = LogFactory.getLog("indexlog");
> IndexWriter writer = null;
> protected static ScheduledExecutorService scheduler;
> protected static ScheduledFuture scheduledTask;
> protected LuceneDocument EXIT_REQ = null;
> ReentrantLock indexLock = new ReentrantLock();
> ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
> File indexLogFile;
> PrintStream indexLogOut;
> IndexProcessor indexProcessor;
> String friendlyName;
> String indexPath;
> int maxSimultaneousDocs;
> public LuceneIndex(int queueSize, LuceneDocument exitReq,
> String friendlyName, String indexPath, int
> maxSimultaneousDocs) {
> this.queue = new ArrayBlockingQueue(queueSize);
> this.EXIT_REQ = exitReq;
> this.friendlyName = friendlyName;
> this.indexPath = indexPath;
> this.maxSimultaneousDocs = maxSimultaneousDocs;
> setLog(friendlyName);
> }
> public int getMaxSimultaneousDocs() {
> return maxSimultaneousDocs;
> }
> public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
> this.maxSimultaneousDocs = maxSimultaneousDocs;
> }
> public ReentrantLock getIndexLock() {
> return indexLock;
> }
> protected void setLog(String logName) {
>
> try {
> indexLogFile = getIndexLogFile(logName);
> if (indexLogFile!=null) {
> if (indexLogFile.length()>10485760)
> indexLogFile.delete();
> indexLogOut = new PrintStream(indexLogFile);
> }
> logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
> } catch (Exception e) {
> logger.error("failed to open index log
> file:"+e.getMessage(),e);
> }
> }
> protected File getIndexLogFile(String logName) {
> try {
> String logfilepath =
> Config.getFileSystem().getLogPath()+File.separator+logName+"index.log";
> return new File(logfilepath);
> } catch (Exception e) {
> logger.error("failed to open index log
> file:"+e.getMessage(),e);
> return null;
> }
> }
> protected void openIndex() throws
> MessageSearchException {
> Exception lastError = null;
> if (writer==null) {
> logger.debug("openIndex() index "+friendlyName+" will be opened.
> it is currently closed.");
> } else {
> logger.debug("openIndex() did not bother opening index
> "+friendlyName+". it is already open.");
> return;
> }
> logger.debug("opening index "+friendlyName+" for write");
> logger.debug("opening search index "+friendlyName+" for write
> {indexpath='"+indexPath+"'}");
> boolean writelock;
> int attempt = 0;
> int maxattempt = 10;
> if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
> maxattempt = 10000;
> } else {
> maxattempt = 10;
> }
> do {
> writelock = false;
> try {
> FSDirectory fsDirectory =
> FSDirectory.getDirectory(indexPath);
> int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
> writer = new IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
> if (indexLog.isDebugEnabled() && indexLogOut!=null) {
> writer.setInfoStream(indexLogOut);
> }
> } catch (LockObtainFailedException lobfe) {
> logger.debug("write lock on index "+friendlyName+". will
> reopen in 50ms.");
> try { Thread.sleep(50); } catch (Exception e) {}
> attempt++;
> writelock = true;
> } catch (CorruptIndexException cie) {
> throw new MessageSearchException("index "+friendlyName+"
> appears to be corrupt. please reindex the active
> volume."+cie.getMessage(),logger);
> } catch (Throwable io) {
> throw new MessageSearchException("failed to write document to
> index "+friendlyName+":"+io.getMessage(),logger);
> }
> } while (writelock && attempt
> if (attempt>=10000)
> throw new MessageSearchException("failed to open index
> "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
> }
> public void indexDocument(LuceneDocument luceneDocument) throws
> MessageSearchException {
> logger.debug("index document {"+luceneDocument+"}");
> long s = (new Date()).getTime();
> if (luceneDocument == null)
> throw new MessageSearchException("assertion failure: null
> document",logger);
> try {
> queue.put(luceneDocument);
> } catch (InterruptedException ie) {
> throw new MessageSearchException("failed to add document to
> queue:"+ie.getMessage(),ie,logger);
> }
> logger.debug("document indexed successfully {"+luceneDocument+"}");
> logger.debug("indexing message end {"+luceneDocument+"}");
> long e = (new Date()).getTime();
> logger.debug("indexing time {time='"+(e-s)+"'}");
> }
> public class IndexProcessor extends Thread {
> public IndexProcessor() {
> setName("index processor");
> }
> public void run() {
> boolean exit = false;
> //ExecutorService documentPool;
> // we abandoned pool as it does not seem to offer any major
> performance benefit
> LuceneDocument luceneDocument = null;
> LinkedListpushbacks = new LinkedList();
> while (!exit) {
> try {
> //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
> luceneDocument = null;
> luceneDocument = (LuceneDocument) queue.take();
> indexLock.lock();
> if (luceneDocument==EXIT_REQ) {
> logger.debug("index exit req received. exiting");
> exit = true;
> continue;
> }
> try {
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> if (luceneDocument==null) {
> logger.debug("index info is null");
> }
> int i = 0;
> while(luceneDocument!=null && i
> try {
> Document doc = luceneDocument.getDocument();
> String language = doc.get("lang");
> if (language==null) {
> language =
> Config.getConfig().getIndex().getIndexLanguage();
> }
>
> writer.addDocument(doc,AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(luceneDocument);
> break;
> }
> //documentPool.execute(new
> IndexDocument(luceneDocument,pushbacks));
> i++;
> if (i
> luceneDocument = (LuceneDocument) queue.poll();
> if
> (luceneDocument==null) {
> logger.debug("index info is null");
> }
> if
> (luceneDocument==EXIT_REQ) {
> logger.debug("index exit req received.
> exiting (2)");
> exit = true;
> break;
> }
> }
> }
> if (pushbacks.size()>0) {
> closeIndex();
> try {
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> for (LuceneDocument pushback : pushbacks) {
> try {
>
> writer.addDocument(pushback.getDocument());
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(pushback);
> }
> //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
> i++;
> }
> }
> //documentPool.shutdown();
> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
> } catch (Throwable ie) {
> logger.error("index write
> interrupted:"+ie.getMessage());
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> } }
> public class IndexDocument extends Thread {
> LuceneDocument luceneDocument = null;
> Listpushbacks = null;
> public IndexDocument(LuceneDocument
> luceneDocument,Listpushbacks) {
> this.luceneDocument = luceneDocument;
> this.pushbacks = pushbacks;
> setName("index document");
> }
> public void run() {
> try {
> writer.addDocument(luceneDocument.getDocument());
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(luceneDocument);
> } catch (Throwable t) {
> logger.error("failed to add document to
> index:"+t.getMessage(),t);
> }
> }};
> }
> protected void closeIndex() {
> try {
> indexLock.lock();
> if (writer!=null) {
> writer.close();
> }
> } catch (Throwable io) {
> logger.error("failed to close index writer:"+io.getMessage(),io);
> } finally {
> logger.debug("writer closed");
> writer = null;
> indexLock.unlock();
> }
> }
> public void deleteIndex() throws MessageSearchException {
> logger.debug("delete index {indexpath='"+indexPath+"'}");
> try {
> indexLock.lock();
> try {
> int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
> writer = new
> IndexWriter(FSDirectory.getDirectory(indexPath),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
> } catch (Throwable cie) {
> logger.error("failed to delete index
> {index='"+indexPath+"'}",cie);
> return;
> }
> MessageIndex.volumeIndexes.remove(this);
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> }
> public void startup() {
> logger.debug("volumeindex is starting up");
> File lockFile = new File(indexPath+File.separatorChar +
> "write.lock");
> if (lockFile.exists()) {
> logger.warn("The server lock file already exists. Either another
> indexer is running or the server was not shutdown correctly.");
> logger.warn("If it is the latter, the lock file must be manually
> deleted at "+lockFile.getAbsolutePath());
> if (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
> logger.debug("index lock file detected on volumeindex
> startup.");
> } else {
> logger.warn("index lock file detected. the server was
> shutdown incorrectly. automatically deleting lock file.");
> logger.warn("indexer is configured to deal with only one
> indexer process.");
> logger.warn("if you are running more than one indexer, your
> index could be subject to corruption.");
> lockFile.delete();
> }
> }
> indexProcessor = new IndexProcessor();
> indexProcessor.start();
> Runtime.getRuntime().addShutdownHook(this);
> }
> public void shutdown() {
> logger.debug("volumeindex is shutting down");
> queue.add(EXIT_REQ);
> scheduler.shutdownNow();
> }
> @Override
> public void run() {
> queue.add(EXIT_REQ);
> }
> public interface LuceneDocument {
> public String toString();
> public Document getDocument();
> public void finalize();
> }
> }
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe [at] lucene
> For additional commands, e-mail: java-user-help [at] lucene


---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe [at] lucene
For additional commands, e-mail: java-user-help [at] lucene

Lucene java-user RSS feed   Index | Next | Previous | View Threaded
 
 


Interested in having your list archived? Contact Gossamer Threads
 
  Web Applications & Managed Hosting Powered by Gossamer Threads Inc.