Factory method to create the actual API object
Factory method to create the actual API object
Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.
Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.
the result of the function within a Some if this processor is Non-empty. Otherwise the function will not be executed and None will be returned
Execute the Processor.
Execute the Processor. If the result is an iterator then execute() will visit each element in the iterator to ensure that any processes mapped to that iterator will be executed.
A typical situation where execute is useful is when the Processor is a side effect processor like a Processor created by an OpenOutput or OpenSeekable object. Both typically return Processor[Unit] processors which only perform side-effecting behaviours.
Example:
val process = for { outProcessor <- output.outputProcessor inProcessor <- file.asInput.blocks.processor _ <- inProcessor.repeatUntilEmpty() block <- inProcessor.next _ <- outProcessor.write(block) } yield () // the copy has not yet occurred // will look through each element in the process (and sub-elements if the process contains a LongTraversable) process.execute()
Apply a filter to this processor.
Apply a filter to this processor. If the filter returns false then the resulting Processor will be empty. It is not possible to know if the Processor is empty unless acquireAndGet is called because the filter is not called until acquireOrGet is executed (or the Processor is somehow processed in another way like obtaining the LongTraversable and traversing that object).
A new Processor with the filter applied.
Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result
Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result
Map the contents of this Processor to a new Processor with a new value.
Map the contents of this Processor to a new Processor with a new value.
The main use case is so Processor work in for-comprehensions but another useful use case is to convert the value read from a ProcessorAPI to a new value. Suppose the value read was an integer you might use map to convert the contained value to a float.
Declare an error handler for handling an error when executing the processor.
Declare an error handler for handling an error when executing the processor. It is important to realize that this will catch exceptions caused ONLY by the current processor, not by 'child' Processors. IE processors that are executed within a flatmap or map of this processor.
Examples:
for { mainProcessor <- input.bytes.processor // if the read fails 1 will be assigned to first and passed to second as the argument of flatmap first <- mainProcessor.read onFailure {_ => -1} // if this read fails an exception will be thrown that will NOT be caught by the above onFailure method second <- mainProcessor.read } yield (first,second)
to handle errors of groups of processors a composite processor must be created and the error handler added to that:
for { mainProcessor <- input.bytes.processor // define a _composite_ processor containing the sub processor // that need to have error handling groupProcessor = for { first <- mainProcessor.read second <- mainProcessor.read } yield (first,second) // attach the error handler tuple <- groupProcessor onFailure {case t => log(t); None} } yield tuple
To handle all errors in one place the yielded processor can have the error handler attached:
val process = for { mainProcessor <- input.bytes.processor first <- mainProcessor.read second <- mainProcessor.read } yield (first,second) process.onFailure{case _ => log(t); None} process.acquireAndGet(...)
The value that will be returned from the handler. Also the type of the returned processor
a partial function that can handle the exceptions thrown during the execution of the process. If the handler returns a non-empty Option the that value will be used as the value of the processor, If the handler returns None then the processor will be an empty processor If the handler throws an exception... then normal semantics of an exception are exhibitted.
A new processor that will behave the same as this except an error during execution will be handled.
Convert this Processor to a Processor containing an Option.
Convert this Processor to a Processor containing an Option. Methods such as next return a potentially empty Processor which will, when in a for comprehension, will stop the process at that point. Converting the processor to an option allows the process handle continue and simply handle the possibility of one input source being empty while other continue to provide data.
Consider the following example:
for { idsIn <- ids.bytesAsInts.processor attributes <- in.lines().processor _ <- idsIn.repeatUntilEmpty(attributes) id <- ids.next.opt.orElse(NoId) attr <- attributes.next.opt.orElse("") } yield new Record(id,attr)
The above example processes the streams completely even if one ends prematurely.
Convert the Processor into a LongTraversable if A is a subclass of Iterator.
Convert the Processor into a LongTraversable if A is a subclass of Iterator.
Same behavior as for filter.
Same behavior as for filter.
Wraps a CloseableIteratorProcessor, which normally produces a ProcessorAPI object, to a new Processor that returns a ProcessorAPI subclass specific to a data type. For example CharProcessorAPI has the lines() method and ByteProcessorAPI have int, long, float etc... methods
for { byteAPI <- ByteProcessorAPI(in.bytes.processor) littleEndianAPI = byteApi.littleEndianAPI littleEndianInt <- littleEndianAPI.nextInt } yield littleEndianInt
scalax.io.processing.CharProcessorAPI
,scalax.io.processing.ProcessorAPI