scalax.io.processing

OutputProcessor

class OutputProcessor extends Processor[OpenOutput]

A processor that opens an Output Resource allowing for batch processing. In other words it permits multiple write operations to be performed on the same opened output. The OpenOutput class has essentially the same API as Output except that each method returns a Processor instead of Unit.

There is a important difference between returning a Processor or Unit. Because OpenOutput returns processor the writing is not done until the processor is executed.

Examples:

The following example is useful when one needs to do several output operations together without closing the output object.

for {
  outProcessor <- Resource.fromOutputStream(stream).outputProcessor
  out = outProcessor.asOutput
} {
  out.write("hi")
  out.write(123)
}

This example is interesting because it illustrates that the methods in outProcessor are NOT the same as in an Output object and to get the expected results writing within the for-comprehension body, the outputProcessor must be converted to a normal output object and the write actions are performed on that.

The following example shows how to read and write data from one input to an output

val process = for {
  in1 <- inResource1.lines.processor
  in2 <- inResource2.lines.processor
  out <- Resource.fromFile("file").outputProcessor
  _ <- in1.repeatUntilEmpty(in2)
  line1 <- in1.nextOption
  line2 <- in2.nextOption
  line = line1 orElse line2
  _ <- out.write(line)
  _ <- out.write("\n")
} yield ()

// At this point the process has not been executed, just defined
process.execute // execute process

Since the methods in OpenOutput return Process object the methods can be called interleaved within a process definition as follows:

val process = for {
  in1 <- inResource1.lines.processor
  in2 <- inResource2.lines.processor
  out <- Resource.fromFile("file").outputProcessor
  _ <- in1.repeatUntilEmpty(in2)
  line1 <- in1.nextOption
  _ <- out.write(line1)
  line2 <- in2.nextOption
  _ <- out.write(line2)
} yield out.asOutput.write("\n")

// At this point the process has not been executed, just defined
process.execute // execute process

In this example notice that the writing in the for-comprehension body converts the outputProcessor to an Output object before writing otherwise the write will not be executed.

Linear Supertypes
Processor[OpenOutput], AnyRef, Any
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. OutputProcessor
  2. Processor
  3. AnyRef
  4. Any
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Instance Constructors

  1. new OutputProcessor(resource: OutputResource[WritableByteChannel])

    resource

    the resource to use in the process

Value Members

  1. final def !=(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  2. final def !=(arg0: Any): Boolean

    Definition Classes
    Any
  3. final def ##(): Int

    Definition Classes
    AnyRef → Any
  4. final def ==(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  5. final def ==(arg0: Any): Boolean

    Definition Classes
    Any
  6. def acquireAndGet[U](f: (OpenOutput) ⇒ U): Option[U]

    Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.

    Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.

    returns

    the result of the function within a Some if this processor is Non-empty. Otherwise the function will not be executed and None will be returned

    Definition Classes
    Processor
    Note

    If A is an iterator do not return it since it might not function outside the scope of acquireAndGet.

  7. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  8. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  9. def context: ResourceContext

    Definition Classes
    OutputProcessorProcessor
  10. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  11. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  12. def execute(): Unit

    Execute the Processor.

    Execute the Processor. If the result is an iterator then execute() will visit each element in the iterator to ensure that any processes mapped to that iterator will be executed.

    A typical situation where execute is useful is when the Processor is a side effect processor like a Processor created by an scalax.io.processing.OpenOutput or scalax.io.processing.OpenSeekable object. Both typically return Processor[Unit] processors which only perform side-effecting behaviours.

    Example:

    val process = for {
      outProcessor <- output.outputProcessor
      inProcessor <- file.asInput.blocks.processor
      _ <- inProcessor.repeatUntilEmpty()
      block <- inProcessor.next
      _ <- outProcessor.write(block)
    } yield ()
    
    // the copy has not yet occurred
    
    // will look through each element in the process (and sub-elements
    if the process contains a LongTraversable)
    process.execute()
    Definition Classes
    Processor
  13. def filter(f: (OpenOutput) ⇒ Boolean): Processor[OpenOutput]

    Apply a filter to this processor.

    Apply a filter to this processor. If the filter returns false then the resulting Processor will be empty. It is not possible to know if the Processor is empty unless acquireAndGet is called because the filter is not called until acquireOrGet is executed (or the Processor is somehow processed in another way like obtaining the LongTraversable and traversing that object).

    returns

    A new Processor with the filter applied.

    Definition Classes
    Processor
  14. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  15. def flatMap[U](f: (OpenOutput) ⇒ Processor[U]): Processor[U]

    Definition Classes
    Processor
  16. def foreach[U](f: (OpenOutput) ⇒ U): Unit

    Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result

    Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result

    Definition Classes
    Processor
  17. def future: Future[Option[OpenOutput]]

    Start the execution of the process in another thread and return a future for accessing the result when it is ready.

    Start the execution of the process in another thread and return a future for accessing the result when it is ready.

    It is important to realize that if the result of the process is a LongTraversable the processing will not in fact be executed because LongTraversables are non-strict (lazy).

    In the case where the Processor has a resulting type of LongTraversable, one must execute either futureExec which will execute all LongTraversables recursively in the process or obtain the LongTraversable (normally by calling the traversable method) and visit each element in the LongTraversable. (See scalax.io.LongTraversable.async)

    See futureExec docs for examples when to use futureExec.

    returns

    A Future that will return the result of executing the process

    Definition Classes
    Processor
  18. def futureExec(): Future[Unit]

    Call execute asynchronously.

    Call execute asynchronously.

    The use of this method over future is that this will recursively visit each element of a LongTraversable if the Processor contains a LongTraversable.

    For Example:

    val processor = for {
      in <- in.blocks().processor
      outApi <- out.outputProcessor
      _ <- in.repeatUntilEmpty()
      block <- in.next
      _ <- outApi.write(block)
    } yield ()
    
     processor.futureExec()

    The example illustrates a case where futureExec is desired because (as a result of the repeatUntilEmpty) the contained object is a LongTraversable. If future is executed the write will not be executed because the writes occur only when an element in the LongTraversable is visited.

    An equivalent way of writing the previous example is:

    val processor2 = for {
      in <- in.blocks().processor
      outApi <- out.outputProcessor
      _ <- in.repeatUntilEmpty()
      block <- in.next
    } yield outApi.asOutput.write(block)
    
    processor.futureExec()
    returns

    a future so one can observe when the process is finished

    Definition Classes
    Processor
  19. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  20. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  21. def init: Opened[OpenOutput] { def execute(): Some[scalax.io.processing.OpenOutput] }

    Definition Classes
    OutputProcessorProcessor
  22. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  23. def map[U](f: (OpenOutput) ⇒ U): Processor[U]

    Map the contents of this Processor to a new Processor with a new value.

    Map the contents of this Processor to a new Processor with a new value.

    The main use case is so Processor work in for-comprehensions but another useful use case is to convert the value read from a ProcessorAPI to a new value. Suppose the value read was an integer you might use map to convert the contained value to a float.

    Definition Classes
    Processor
  24. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  25. final def notify(): Unit

    Definition Classes
    AnyRef
  26. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  27. def onFailure[U >: OpenOutput](handler: PartialFunction[Throwable, Option[U]]): Processor[U]

    Declare an error handler for handling an error when executing the processor.

    Declare an error handler for handling an error when executing the processor. It is important to realize that this will catch exceptions caused ONLY by the current processor, not by 'child' Processors. IE processors that are executed within a flatmap or map of this processor.

    Examples:

    for {
      mainProcessor <- input.bytes.processor
      // if the read fails 1 will be assigned to first and passed to second as the argument of flatmap
      first <- mainProcessor.read onFailure {_ => -1}
      // if this read fails an exception will be thrown that will NOT be caught by the above onFailure method
      second <- mainProcessor.read
    } yield (first,second)

    to handle errors of groups of processors a composite processor must be created and the error handler added to that:

    for {
      mainProcessor <- input.bytes.processor
      // define a _composite_ processor containing the sub processor
      // that need to have error handling
      groupProcessor = for {
        first <- mainProcessor.read
        second <- mainProcessor.read
      } yield (first,second)
      // attach the error handler
      tuple <- groupProcessor onFailure {case t => log(t); None}
    } yield tuple

    To handle all errors in one place the yielded processor can have the error handler attached:

    val process = for {
      mainProcessor <- input.bytes.processor
      first <- mainProcessor.read
      second <- mainProcessor.read
    } yield (first,second)
    
    process.onFailure{case _ => log(t); None}
    
    process.acquireAndGet(...)
    U

    The value that will be returned from the handler. Also the type of the returned processor

    handler

    a partial function that can handle the exceptions thrown during the execution of the process. If the handler returns a non-empty Option the that value will be used as the value of the processor, If the handler returns None then the processor will be an empty processor If the handler throws an exception... then normal semantics of an exception are exhibitted.

    returns

    A new processor that will behave the same as this except an error during execution will be handled.

    Definition Classes
    Processor
  28. def opt: Processor[Option[OpenOutput]]

    Convert this Processor to a Processor containing an Option.

    Convert this Processor to a Processor containing an Option. Methods such as next return a potentially empty Processor which will, when in a for comprehension, will stop the process at that point. Converting the processor to an option allows the process handle continue and simply handle the possibility of one input source being empty while other continue to provide data.

    Consider the following example:

    for {
      idsIn <- ids.bytesAsInts.processor
      attributes <- in.lines().processor
      _ <- idsIn.repeatUntilEmpty(attributes)
      id <- ids.next.opt.orElse(NoId)
      attr <- attributes.next.opt.orElse("")
    } yield new Record(id,attr)

    The above example processes the streams completely even if one ends prematurely.

    Definition Classes
    Processor
  29. def processFactory: ProcessorFactory

    Attributes
    protected
    Definition Classes
    Processor
  30. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  31. def timeout(timeout: Duration): TimingOutProcessor[OpenOutput]

    Create a modified processor that will throw a java.util.concurrent.TimeoutException if the process takes longer than the provided timeout.

    Create a modified processor that will throw a java.util.concurrent.TimeoutException if the process takes longer than the provided timeout.

    timeout

    the length of time before the timeout exception is thrown

    returns

    a new processor with a timeout associated with it

    Definition Classes
    Processor
  32. def toString(): String

    Definition Classes
    AnyRef → Any
  33. def traversable[B](implicit transformer: ProcessorTransformer[B, OpenOutput, LongTraversable[B]]): LongTraversable[B]

    Convert the Processor into a LongTraversable if A is a subclass of Iterator.

    Convert the Processor into a LongTraversable if A is a subclass of Iterator.

    Definition Classes
    Processor
    Annotations
    @implicitNotFound( ... )
  34. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  35. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  36. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  37. def withFilter(f: (OpenOutput) ⇒ Boolean): Processor[OpenOutput]

    Same behavior as for filter.

    Same behavior as for filter.

    Definition Classes
    Processor

Inherited from Processor[OpenOutput]

Inherited from AnyRef

Inherited from Any

Ungrouped