scalax.io.processing

ProcessorAPI

class ProcessorAPI[+A] extends AnyRef

The Common API for declaring a process workflow on an Input object. This class contains all the operations that can performed. A ProcessorAPI object essentially wraps a data source/resource that provides elements of type A, the methods allow the developer to read and skip over the elements as needed.

Also see the section below about data specific APIs

The processing package contains classes for processing and the ProcessorAPI is the core of the Processing API.

The concept behind the processing API is to declare the format in a largely declarative manner and read the contents as needed. The processing API is largely not needed when only a single input object is to be processed however it could be considered to be slightly easier to read but when multiple input objects are to be processed at the same time and have interdependencies then the processing API becomes indispensable.

There are two commonly used patterns: extract a single item of data from Input objects and extract a collection of data elements from Input objects.

Pattern 1: Read a single item of data from Input objects:

val headerProcessor = for {
  api1 <- input.bytes.processor
  api2 <- input2.bytes.processor
  header1 <- api.take(100)
  header2 <- api2.take(100)
} yield new Header(header1,header2)

headerProcessor.acquireAndGet(header => /* do something with header */

In this use case the format is declared as the for-comprehension and the result is returned by the yield. The val headerProcessor is not executed executed until acquireAndGet is executed, at that point it essentially is the process workflow for reading the headers. The acquireAndGet method executes the process and passes the resulting object to the function argument.

Note

repeatUntilEmpty takes parameters so that it will repeat until a set of ProcessorAPI objects are empty rather just one

Notice the difference between:

for {
  api1 <- input.bytes.processor
  api2 <- input.chars.processor
  _ <- api1.repeatUntilEmpty()
  _ <- api2.repeatUntilEmpty()
  ...
} ...

and

for {
  api1 <- input.bytes.processor
  api2 <- input.chars.processor
  _ <- api1.repeatUntilEmpty(api2)
  ...
} ....

and

for {
  api1 <- input.bytes.processor
  _ <- api1.repeatUntilEmpty()
  api2 <- input.chars.processor
  _ <- api2.repeatUntilEmpty()
  ...
} ....

In case 1 the process follow api2.repeatUntilEmpty will be repeated until api2 is empty and if api1 is not empty the process will never end. This is because api1 will repeatUntilEmpty but api2 is empty so it will simply return

In case 2 the process will repeat until both api1 and api2 are empty

In case 3 api2 will be created then the process will repeat until api2 is empty. Then api1 will be checked to see if it is empty. If it is then the api2 will be created again and the process will be repeated until api2 is again empty. etc...

All the normal behaviour of for-comprehensions are supported as well, including guards, pattern matching etc...

In addition to this class, one can wrap this API with a type Specific API for a particular type a. For example support A is bytes, ByteProcessorAPI can be used to gain access to APIs that are specific to working with Bytes.

,

when using repeatUntilEmpty() the following lines must consume data from the ProcessorAPI otherwise the loop will never terminate (because it will never be empty)

,

There is no method for directly obtaining the result of the process because of the next use-case. As you will see the ProcessorAPI has repeat*** methods that result in Processor[Iterator[A] ], in this case the iterator only works when the resources are open. Because of this all processors must be accessed within a context where they will be correctly closed after accessing.

In short, accessing the result of a Processor is done within the acquireAndGet method (or via traversable, see next section).

Pattern 2: Define a repetitive format and obtain a LongTraversable[A] for processing each element defined by the format:

val process = for {
  api1 <- input.bytes.processor
  api2 <- input.chars.processor
  _ <- api1.repeatUntilEmpty()
  binaryData <- api1.take(100)
  attributes <- api2.takeWhile(_ != '\n')
 } yield new Record(binaryData,attributes)

process.traversable // do something with traversable of records

In this example the entire api1 is processed until it is empty. The value process is of type Process[Iterator[Record] ] and can be accessed by using either acquireAndGet or traversable. acquireAndGet behaves the same way as for Pattern 1 and the iterator is accessed as normal. However the iterator must not escape the acquireAndGet because it will no longer be valid because the resources it depends on will be closed, thus the traversable is often a better option because it returns a LongTraversable and can be used repeatedly without concern about the underlying resources being left open or closed while the traversable continues to be used.

See also

scalax.io.processing.CharProcessorAPI

scalax.io.processing.ProcessorAPI

scalax.io.processing.SpecificApiFactory

Linear Supertypes
AnyRef, Any
Known Subclasses
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. ProcessorAPI
  2. AnyRef
  3. Any
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Instance Constructors

  1. new ProcessorAPI(iter: CloseableIterator[A], resourceContext: ResourceContext)

Value Members

  1. final def !=(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  2. final def !=(arg0: Any): Boolean

    Definition Classes
    Any
  3. final def ##(): Int

    Definition Classes
    AnyRef → Any
  4. final def ==(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  5. final def ==(arg0: Any): Boolean

    Definition Classes
    Any
  6. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  7. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  8. def drop(i: Int): Processor[Unit]

    Drop/skip the next i elements in the input source if possible.

    Drop/skip the next i elements in the input source if possible.

    Since dropping results in nothing the result can be ignored.

    // results in a Processor[Seq[]]
    for {
      api <- input.processor
      _ <- api.drop(10)
      seq <- api.take(5)
    } yield seq
    i

    the number of elements to skip

    returns

    the returned Processor can typically be ignored since it is a unit processor.

  9. def end(): Processor[Unit]

    Ends the ProcessAPI.

    Ends the ProcessAPI. Any attempts to take or drop will have no effect after the process is ended.

    for {
      api <- input.processor
      _ <- repeatUntilEmpty
      seq <- api.take(5)
      if(seq contains 1)
      _ <- api.end
    } yield seq

    Example takes 5 elements from the input source until it contains the value 1 then it ends the repeating and returns the seq.

    The resulting Processor will be of type Processor[Iterator[Seq[A] ] ], even though there can only be one element, thus calling: result.traversable.headOption is likely the easiest method of obtaining the value from the Processor

    Just a side note: This could be done with a LongTraversable with: traversable.sliding(5,5).filter(_ contains 1).headOption Note that you need to use headOption because you don't know if there are any elements in the resulting traversable, however the LongTraversable API would be difficult to use if you want to processes multiple input objects together

    returns

    a Processor[Unit] and therefore the result can normally be ignored

  10. def endIf(f: ⇒ Boolean): Processor[Unit]

    End the processes if the predicate return true.

    End the processes if the predicate return true. Any attempts to take or drop will have no effect after the process is ended.

    for {
      api <- input.processor
      _ <- repeatUntilEmpty()
      seq <- input.take(5)
      _ <- input.endIf(_ contains 25)
    } yield seq

    The Example takes 5 elements of the input until one of the sequences contains the number 25.

    As a side note: This could be done using the LongTraversable API: traversable.sliding(5,5).takeWhile(i => !(i contains 25)) While this example can be done with the normal traversable API, this API is typically preferred when reading data from multiple interdependent sources.

    f

    the predi

    returns

    a Processor[Unit] and therefore the result can normally be ignored

  11. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  12. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  13. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  14. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  15. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  16. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  17. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  18. def next: Processor[A]

    Read the next element in the input source if possible.

    Read the next element in the input source if possible.

    If there is an element left in the input source a Processor containing that element will be returned, otherwise the returned processor will be empty.

    Since next will return an empty processor it will terminate the looping in a for-comprehension and the resulting Processor could be an empty processor. This can be a problem when reading from multiple input sources. the opt method can be used to modify the returned processor so that it returns a Processor[Option[_]] that is never empty. COnsider the following examples

    Example:

    for {
      api1 <- input1.bytes.processor
      api2 <- input2.bytes.processor
      _ <- api1.repeatUntilEmpty()
      api1Next <- api.next
      next <- api2.next
    } println(api1Next)
    
    for {
      api1 <- input1.bytes.processor
      api2 <- input2.bytes.processor
      _ <- api1.repeatUntilEmpty()
      api1Next <- api.next
      nextOpt <- api2.next.opt
    } println(api1Next)

    The two examples appear similar. The first example will print the bytes from input1 until input2 is empty. Where as in example 2 will print all of api1 irregardless of whether api2 is empty or not.

    The reason is that in example 1 (next), next returns an empty processor when input2 is empty and thus the println is not executed. In example 2 the Processor is never empty it is either Some or None.

    returns

    An empty processor if there are no more elements in the input source or a processor containing the next element in the input source.

    Note

    if the process has a repeatUntilEmpty() method call, next.opt should be preferred over next. See repeatUntilEmpty for a more detailed description of why

  19. final def notify(): Unit

    Definition Classes
    AnyRef
  20. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  21. def processFactory: ProcessorFactory

    Attributes
    protected
  22. def repeat(times: Int, otherProcessorAPIs: ProcessorAPI[_]*): RepeatUntilEmpty

    Loops n times or until the provided input sources are all empty.

    Loops n times or until the provided input sources are all empty.

    This method is similar to repeatUntilEmpty except it limits the number of repetitions that can be performed.

    times

    the maximum number of loops

    otherProcessorAPIs

    other input sources to monitor for empty before prematurely ending the loop. If this and otherProcessorAPIs are all empty then the looping will be ended

  23. def repeatUntilEmpty(otherProcessorAPIs: ProcessorAPI[_]*): RepeatUntilEmpty

    Create a Processor that simply repeats until this processor and all of the other input sources that are passed in are empty or ended.

    Create a Processor that simply repeats until this processor and all of the other input sources that are passed in are empty or ended. Each repetition generates an integer that can be used to count the number of repetitions if desired.

    otherProcessorAPIs

    other processors to empty (in addition to this) before ending the loop

    returns

    A Processor containing a sequence of whatever elements are returned by the for-comprehension

    Note

    repeatUntilEmpty can very easily result in infinite loops because it depends on the following components of the process/workflow correctly retrieving elements from the input source so that it eventually empties

    The following examples are ways that one can create infinite loops (or loops that last up to Long.MaxValue):

    for {
      processor1 <- input.bytes.processor
      processor2 <- input.bytes.processor
      processor1Loops <- processor1.repeatUntilEmpty()
        // if processor2 is emptied before processor1 there is an infinite loop because
        // this section will be the loop and since processor1 is not accessed here we have a loop
        // to be safer next1 should be in this section
      processor2Loops <- processor2.repeatUntilEmpty()
      next1 <- processor1.next.opt
      next2 <- processor2.next.opt
    } yield (next1, next2)
    for {
      processor1 <- input.bytes.processor
      processor2 <- input.bytes.processor
      processor1Loops <- processor1.repeatUntilEmpty(processor2)
      next1 <- processor1.next  // next.opt should be used here because this can cause
                                // an infinite loop.  if processor1 is empty and processor2 is not
                                // next produces an empty processor so the next line will not be executed
                                // next.opt would always produce an non-empty Processor and therefore
                                // should be preferred over next
      next2 <- processor2.next
    } yield (next1, next2)
    for {
      processor1 <- input.bytes.processor
      processor2 <- input.bytes.processor
      loops <- processor1.repeatUntilEmpty(processor2)
      if loops < 100  // this guard is dangerous because if there are more than 100 elements in either
                      // processor1 or processor2 there is an infinite loop because next1 and next2 never get called
      next1 <- processor1.next.opt
      next2 <- processor2.next.opt
    } yield (next1, next2)

    A safe implementation using repeatUntilEmpty should only execute methods that produce non-empty Processors or should be done with extreme care.

    for { processor1 <- input.bytes.processor processor2 <- input.bytes.processor processor1Loops <- processor1.repeatUntilEmpty(processor2) option1 <- processor1.next.opt option2 <- processor2.next.opt next1 <- option1 next2 <- option2 } yield (next1, next2) }}}

  24. val resourceContext: ResourceContext

  25. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  26. def take(i: Int): Processor[Vector[A]]

    Construct a sequence by taking up to i elements from the input source

    Construct a sequence by taking up to i elements from the input source

    i

    the maximum number of elements to take

    returns

    a Seq[A] consisting of the elements taken from the input source

  27. def takeWhile(f: (A) ⇒ Boolean): Processor[Seq[A]]

    Construct a sequence by taking elements from the input source until the function returns false or there are no more elements in the input source.

    Construct a sequence by taking elements from the input source until the function returns false or there are no more elements in the input source.

    f

    the predicate that determines when to stop taking elements

    returns

    a Seq[A] consisting of the elements taken from the input source

  28. def toString(): String

    Definition Classes
    AnyRef → Any
  29. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  30. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  31. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()

Inherited from AnyRef

Inherited from Any

Ungrouped