scalax.io.processing

OutputProcessor

class OutputProcessor extends Processor[OpenOutput]

A processor that opens an Output Resource allowing for batch processing. In other words it permits multiple write operations to be performed on the same opened output. The OpenOutput class has essentially the same API as Output except that each method returns a Processor instead of Unit.

There is a important difference between returning a Processor or Unit. Because OpenOutput returns processor the writing is not done until the processor is executed.

Examples:

The following example is useful when one needs to do several output operations together without closing the output object.

for {
  outProcessor <- Resource.fromOutputStream(stream).outputProcessor
  out = outProcessor.asOutput
} {
  out.write("hi")
  out.write(123)
}

This example is interesting because it illustrates that the methods in outProcessor are NOT the same as in an Output object and to get the expected results writing within the for-comprehension body, the outputProcessor must be converted to a normal output object and the write actions are performed on that.

The following example shows how to read and write data from one input to an output

val process = for {
  in1 <- inResource1.lines.processor
  in2 <- inResource2.lines.processor
  out <- Resource.fromFile("file").outputProcessor
  _ <- in1.repeatUntilEmpty(in2)
  line1 <- in1.nextOption
  line2 <- in2.nextOption
  line = line1 orElse line2
  _ <- out.write(line)
  _ <- out.write("\n")
} yield ()

// At this point the process has not been executed, just defined
process.execute // execute process

Since the methods in OpenOutput return Process object the methods can be called interleaved within a process definition as follows:

val process = for {
  in1 <- inResource1.lines.processor
  in2 <- inResource2.lines.processor
  out <- Resource.fromFile("file").outputProcessor
  _ <- in1.repeatUntilEmpty(in2)
  line1 <- in1.nextOption
  _ <- out.write(line1)
  line2 <- in2.nextOption
  _ <- out.write(line2)
} yield out.asOutput.write("\n")

// At this point the process has not been executed, just defined
process.execute // execute process

In this example notice that the writing in the for-comprehension body converts the outputProcessor to an Output object before writing otherwise the write will not be executed.

Linear Supertypes
Processor[OpenOutput], AnyRef, Any
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. Hide All
  2. Show all
  1. OutputProcessor
  2. Processor
  3. AnyRef
  4. Any
Visibility
  1. Public
  2. All

Instance Constructors

  1. new OutputProcessor (resource: OutputResource[WritableByteChannel])

    resource

    the resource to use in the process

Value Members

  1. def != (arg0: AnyRef): Boolean

    Attributes
    final
    Definition Classes
    AnyRef
  2. def != (arg0: Any): Boolean

    Attributes
    final
    Definition Classes
    Any
  3. def ## (): Int

    Attributes
    final
    Definition Classes
    AnyRef → Any
  4. def == (arg0: AnyRef): Boolean

    Attributes
    final
    Definition Classes
    AnyRef
  5. def == (arg0: Any): Boolean

    Attributes
    final
    Definition Classes
    Any
  6. def acquireAndGet [U] (f: (OpenOutput) ⇒ U): Option[U]

    Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.

    Execute the process workflow represented by this Processor and pass the function the result, if the Processor is nonEmpty.

    returns

    the result of the function within a Some if this processor is Non-empty. Otherwise the function will not be executed and None will be returned

    Definition Classes
    Processor
  7. def asInstanceOf [T0] : T0

    Attributes
    final
    Definition Classes
    Any
  8. def clone (): AnyRef

    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  9. def context : ResourceContext

    Definition Classes
    OutputProcessorProcessor
  10. def eq (arg0: AnyRef): Boolean

    Attributes
    final
    Definition Classes
    AnyRef
  11. def equals (arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  12. def execute (): Unit

    Execute the Processor.

    Execute the Processor. If the result is an iterator then execute() will visit each element in the iterator to ensure that any processes mapped to that iterator will be executed.

    A typical situation where execute is useful is when the Processor is a side effect processor like a Processor created by an OpenOutput or OpenSeekable object. Both typically return Processor[Unit] processors which only perform side-effecting behaviours.

    Example:

    val process = for {
      outProcessor <- output.outputProcessor
      inProcessor <- file.asInput.blocks.processor
      _ <- inProcessor.repeatUntilEmpty()
      block <- inProcessor.next
      _ <- outProcessor.write(block)
    } yield ()
    
    // the copy has not yet occurred
    
    // will look through each element in the process (and sub-elements
    if the process contains a LongTraversable)
    process.execute()
    
    Definition Classes
    Processor
  13. def filter (f: (OpenOutput) ⇒ Boolean): Processor[OpenOutput]

    Apply a filter to this processor.

    Apply a filter to this processor. If the filter returns false then the resulting Processor will be empty. It is not possible to know if the Processor is empty unless acquireAndGet is called because the filter is not called until acquireOrGet is executed (or the Processor is somehow processed in another way like obtaining the LongTraversable and traversing that object).

    returns

    A new Processor with the filter applied.

    Definition Classes
    Processor
  14. def finalize (): Unit

    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  15. def flatMap [U] (f: (OpenOutput) ⇒ Processor[U]): Processor[U]

    Definition Classes
    Processor
  16. def foreach [U] (f: (OpenOutput) ⇒ U): Unit

    Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result

    Execute the Processor and pass the result to the function, much like acquireAndGet but does not return a result

    Definition Classes
    Processor
  17. def getClass (): java.lang.Class[_]

    Attributes
    final
    Definition Classes
    AnyRef → Any
  18. def hashCode (): Int

    Definition Classes
    AnyRef → Any
  19. def init : Opened[OpenOutput] { def execute(): Some[scalax.io.processing.OpenOutput] }

    Definition Classes
    OutputProcessorProcessor
  20. def isInstanceOf [T0] : Boolean

    Attributes
    final
    Definition Classes
    Any
  21. def map [U] (f: (OpenOutput) ⇒ U): Processor[U]

    Map the contents of this Processor to a new Processor with a new value.

    Map the contents of this Processor to a new Processor with a new value.

    The main use case is so Processor work in for-comprehensions but another useful use case is to convert the value read from a ProcessorAPI to a new value. Suppose the value read was an integer you might use map to convert the contained value to a float.

    Definition Classes
    Processor
  22. def ne (arg0: AnyRef): Boolean

    Attributes
    final
    Definition Classes
    AnyRef
  23. def notify (): Unit

    Attributes
    final
    Definition Classes
    AnyRef
  24. def notifyAll (): Unit

    Attributes
    final
    Definition Classes
    AnyRef
  25. def onFailure [U >: OpenOutput] (handler: PartialFunction[Throwable, Option[U]]): Processor[U]

    Declare an error handler for handling an error when executing the processor.

    Declare an error handler for handling an error when executing the processor. It is important to realize that this will catch exceptions caused ONLY by the current processor, not by 'child' Processors. IE processors that are executed within a flatmap or map of this processor.

    Examples:

    for {
      mainProcessor <- input.bytes.processor
      // if the read fails 1 will be assigned to first and passed to second as the argument of flatmap
      first <- mainProcessor.read onFailure {_ => -1}
      // if this read fails an exception will be thrown that will NOT be caught by the above onFailure method
      second <- mainProcessor.read
    } yield (first,second)
    

    to handle errors of groups of processors a composite processor must be created and the error handler added to that:

    for {
      mainProcessor <- input.bytes.processor
      // define a _composite_ processor containing the sub processor
      // that need to have error handling
      groupProcessor = for {
        first <- mainProcessor.read
        second <- mainProcessor.read
      } yield (first,second)
      // attach the error handler
      tuple <- groupProcessor onFailure {case t => log(t); None}
    } yield tuple
    

    To handle all errors in one place the yielded processor can have the error handler attached:

    val process = for {
      mainProcessor <- input.bytes.processor
      first <- mainProcessor.read
      second <- mainProcessor.read
    } yield (first,second)
    
    process.onFailure{case _ => log(t); None}
    
    process.acquireAndGet(...)
    
    U

    The value that will be returned from the handler. Also the type of the returned processor

    handler

    a partial function that can handle the exceptions thrown during the execution of the process. If the handler returns a non-empty Option the that value will be used as the value of the processor, If the handler returns None then the processor will be an empty processor If the handler throws an exception... then normal semantics of an exception are exhibitted.

    returns

    A new processor that will behave the same as this except an error during execution will be handled.

    Definition Classes
    Processor
  26. def opt : Processor[Option[OpenOutput]]

    Convert this Processor to a Processor containing an Option.

    Convert this Processor to a Processor containing an Option. Methods such as next return a potentially empty Processor which will, when in a for comprehension, will stop the process at that point. Converting the processor to an option allows the process handle continue and simply handle the possibility of one input source being empty while other continue to provide data.

    Consider the following example:

    for {
      idsIn <- ids.bytesAsInts.processor
      attributes <- in.lines().processor
      _ <- idsIn.repeatUntilEmpty(attributes)
      id <- ids.next.opt.orElse(NoId)
      attr <- attributes.next.opt.orElse("")
    } yield new Record(id,attr)
    

    The above example processes the streams completely even if one ends prematurely.

    Definition Classes
    Processor
  27. def processFactory : ProcessorFactory

    Attributes
    protected
    Definition Classes
    Processor
  28. def synchronized [T0] (arg0: ⇒ T0): T0

    Attributes
    final
    Definition Classes
    AnyRef
  29. def toString (): String

    Definition Classes
    AnyRef → Any
  30. def traversable [B] (implicit transformer: ProcessorTransformer[B, OpenOutput, LongTraversable[B]]): LongTraversable[B]

    Convert the Processor into a LongTraversable if A is a subclass of Iterator.

    Convert the Processor into a LongTraversable if A is a subclass of Iterator.

    Definition Classes
    Processor
    Annotations
    @implicitNotFound( ... )
  31. def wait (): Unit

    Attributes
    final
    Definition Classes
    AnyRef
    Annotations
    @throws()
  32. def wait (arg0: Long, arg1: Int): Unit

    Attributes
    final
    Definition Classes
    AnyRef
    Annotations
    @throws()
  33. def wait (arg0: Long): Unit

    Attributes
    final
    Definition Classes
    AnyRef
    Annotations
    @throws()
  34. def withFilter (f: (OpenOutput) ⇒ Boolean): Processor[OpenOutput]

    Same behavior as for filter.

    Same behavior as for filter.

    Definition Classes
    Processor

Inherited from Processor[OpenOutput]

Inherited from AnyRef

Inherited from Any