Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.
Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.
the result of the function within a Some if this processor is Non-empty. Otherwise the function will not be executed and None will be returned
If A is an iterator do not return it since it might not function outside the scope of acquireAndGet.
Execute the Processor.
Execute the Processor. If the result is an iterator then execute() will visit each element in the iterator to ensure that any processes mapped to that iterator will be executed.
A typical situation where execute is useful is when the Processor is a side effect processor like a Processor created by an scalax.io.processing.OpenOutput or scalax.io.processing.OpenSeekable object. Both typically return Processor[Unit] processors which only perform side-effecting behaviours.
Example:
val process = for { outProcessor <- output.outputProcessor inProcessor <- file.asInput.blocks.processor _ <- inProcessor.repeatUntilEmpty() block <- inProcessor.next _ <- outProcessor.write(block) } yield () // the copy has not yet occurred // will look through each element in the process (and sub-elements if the process contains a LongTraversable) process.execute()
Apply a filter to this processor.
Apply a filter to this processor. If the filter returns false then the resulting Processor will be empty. It is not possible to know if the Processor is empty unless acquireAndGet is called because the filter is not called until acquireOrGet is executed (or the Processor is somehow processed in another way like obtaining the LongTraversable and traversing that object).
A new Processor with the filter applied.
Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result
Start the execution of the process in another thread and return a future for accessing the result when it is ready.
Start the execution of the process in another thread and return a future for accessing the result when it is ready.
It is important to realize that if the result of the process is a LongTraversable the processing will not in fact be executed because LongTraversables are non-strict (lazy).
In the case where the Processor has a resulting type of LongTraversable, one must execute either futureExec which will execute all LongTraversables recursively in the process or obtain the LongTraversable (normally by calling the traversable method) and visit each element in the LongTraversable. (See scalax.io.LongTraversable.async)
See futureExec docs for examples when to use futureExec.
A Future that will return the result of executing the process
Call execute asynchronously.
Call execute asynchronously.
The use of this method over future is that this will recursively visit each element of a LongTraversable if the Processor contains a LongTraversable.
For Example:
val processor = for { in <- in.blocks().processor outApi <- out.outputProcessor _ <- in.repeatUntilEmpty() block <- in.next _ <- outApi.write(block) } yield () processor.futureExec()
The example illustrates a case where futureExec is desired because (as a result of the repeatUntilEmpty) the contained object is a LongTraversable. If future is executed the write will not be executed because the writes occur only when an element in the LongTraversable is visited.
An equivalent way of writing the previous example is:
val processor2 = for { in <- in.blocks().processor outApi <- out.outputProcessor _ <- in.repeatUntilEmpty() block <- in.next } yield outApi.asOutput.write(block) processor.futureExec()
a future so one can observe when the process is finished
Map the contents of this Processor to a new Processor with a new value.
Map the contents of this Processor to a new Processor with a new value.
The main use case is so Processor work in for-comprehensions but another useful use case is to convert the value read from a ProcessorAPI to a new value. Suppose the value read was an integer you might use map to convert the contained value to a float.
Declare an error handler for handling an error when executing the processor.
Declare an error handler for handling an error when executing the processor. It is important to realize that this will catch exceptions caused ONLY by the current processor, not by 'child' Processors. IE processors that are executed within a flatmap or map of this processor.
Examples:
for { mainProcessor <- input.bytes.processor // if the read fails 1 will be assigned to first and passed to second as the argument of flatmap first <- mainProcessor.read onFailure {_ => -1} // if this read fails an exception will be thrown that will NOT be caught by the above onFailure method second <- mainProcessor.read } yield (first,second)
to handle errors of groups of processors a composite processor must be created and the error handler added to that:
for { mainProcessor <- input.bytes.processor // define a _composite_ processor containing the sub processor // that need to have error handling groupProcessor = for { first <- mainProcessor.read second <- mainProcessor.read } yield (first,second) // attach the error handler tuple <- groupProcessor onFailure {case t => log(t); None} } yield tuple
To handle all errors in one place the yielded processor can have the error handler attached:
val process = for { mainProcessor <- input.bytes.processor first <- mainProcessor.read second <- mainProcessor.read } yield (first,second) process.onFailure{case _ => log(t); None} process.acquireAndGet(...)
The value that will be returned from the handler. Also the type of the returned processor
a partial function that can handle the exceptions thrown during the execution of the process. If the handler returns a non-empty Option the that value will be used as the value of the processor, If the handler returns None then the processor will be an empty processor If the handler throws an exception... then normal semantics of an exception are exhibitted.
A new processor that will behave the same as this except an error during execution will be handled.
Convert this Processor to a Processor containing an Option.
Convert this Processor to a Processor containing an Option. Methods such as next return a potentially empty Processor which will, when in a for comprehension, will stop the process at that point. Converting the processor to an option allows the process handle continue and simply handle the possibility of one input source being empty while other continue to provide data.
Consider the following example:
for { idsIn <- ids.bytesAsInts.processor attributes <- in.lines().processor _ <- idsIn.repeatUntilEmpty(attributes) id <- ids.next.opt.orElse(NoId) attr <- attributes.next.opt.orElse("") } yield new Record(id,attr)
The above example processes the streams completely even if one ends prematurely.
Create a modified processor that will throw a java.util.concurrent.TimeoutException if the process takes longer than the provided timeout.
Create a modified processor that will throw a java.util.concurrent.TimeoutException if the process takes longer than the provided timeout.
the length of time before the timeout exception is thrown
a new processor with a timeout associated with it
Convert the Processor into a LongTraversable if A is a subclass of Iterator.
Convert the Processor into a LongTraversable if A is a subclass of Iterator.
Same behavior as for filter.
A point or step in a IO process workflow.
The processing API is declarative way to process several input streams together. The allows much more powerful options beyond what is available using the normal collections-like API available in scalax.io.LongTraversable.
Ignore for the moment that the above example is trivial.
The for-comprehension defines the process. But nothing is executed until the result (firstElems) is accessed with acquireAndGet. This is because IO typically accesses resources which must be opened and closed. When acquireAndGet is executed. The Processor[ProcessorAPI[A]] obtained from the longTraversable is opened and the first and second element are read from the processor (via the ProcessorAPI which defines the operations permitted for this process pipeline). The result is passed to the function that was initially passed to acquireAndGet. After the function completes the process is closed.
For more details on how the processing API is used look at the scalax.io.processing.ProcessorAPI documentation.
It is possible for a Processor to be empty so acquireAndGet returns an Option to support this case. Consider:
Obviously the if false means the Processor will not contain any values so the println will not be executed and None will be retured from acquireAndGet
the type of the object that will be the result of running this Processor
scalax.io.processing.ProcessorAPI