Drop/skip the next i elements in the input source if possible.
Drop/skip the next i elements in the input source if possible.
Since dropping results in nothing the result can be ignored.
// results in a Processor[Seq[]] for { api <- input.processor _ <- api.drop(10) seq <- api.take(5) } yield seq
the number of elements to skip
the returned Processor can typically be ignored since it is a unit processor.
Ends the ProcessAPI.
Ends the ProcessAPI. Any attempts to take or drop will have no effect after the process is ended.
for { api <- input.processor _ <- repeatUntilEmpty seq <- api.take(5) if(seq contains 1) _ <- api.end } yield seq
Example takes 5 elements from the input source until it contains the value 1 then it ends the repeating and returns the seq.
The resulting Processor will be of type Processor[Iterator[Seq[A] ] ], even though there can only be one element, thus calling: result.traversable.headOption is likely the easiest method of obtaining the value from the Processor
Just a side note: This could be done with a LongTraversable with: traversable.sliding(5,5).filter(_ contains 1).headOption Note that you need to use headOption because you don't know if there are any elements in the resulting traversable, however the LongTraversable API would be difficult to use if you want to processes multiple input objects together
a Processor[Unit] and therefore the result can normally be ignored
End the processes if the predicate return true.
End the processes if the predicate return true. Any attempts to take or drop will have no effect after the process is ended.
for { api <- input.processor _ <- repeatUntilEmpty() seq <- input.take(5) _ <- input.endIf(_ contains 25) } yield seq
The Example takes 5 elements of the input until one of the sequences contains the number 25.
As a side note: This could be done using the LongTraversable API: traversable.sliding(5,5).takeWhile(i => !(i contains 25)) While this example can be done with the normal traversable API, this API is typically preferred when reading data from multiple interdependent sources.
the predi
a Processor[Unit] and therefore the result can normally be ignored
Read the next element in the input source if possible.
Read the next element in the input source if possible.
If there is an element left in the input source a Processor containing that element will be returned, otherwise the returned processor will be empty.
Since next will return an empty processor it will terminate the looping in a for-comprehension and the resulting Processor could be an empty processor. This can be a problem when reading from multiple input sources. the opt method can be used to modify the returned processor so that it returns a Processor[Option[_]] that is never empty. COnsider the following examples
Example:
for { api1 <- input1.bytes.processor api2 <- input2.bytes.processor _ <- api1.repeatUntilEmpty() api1Next <- api.next next <- api2.next } println(api1Next) for { api1 <- input1.bytes.processor api2 <- input2.bytes.processor _ <- api1.repeatUntilEmpty() api1Next <- api.next nextOpt <- api2.next.opt } println(api1Next)
The two examples appear similar. The first example will print the bytes from input1 until input2 is empty. Where as in example 2 will print all of api1 irregardless of whether api2 is empty or not.
The reason is that in example 1 (next), next returns an empty processor when input2 is empty and thus the println is not executed. In example 2 the Processor is never empty it is either Some or None.
An empty processor if there are no more elements in the input source or a processor containing the next element in the input source.
if the process has a repeatUntilEmpty() method call, next.opt should be preferred over next. See repeatUntilEmpty for a more detailed description of why
Loops n times or until the provided input sources are all empty.
Loops n times or until the provided input sources are all empty.
This method is similar to repeatUntilEmpty except it limits the number of repetitions that can be performed.
the maximum number of loops
other input sources to monitor for empty before prematurely ending the loop. If this and otherProcessorAPIs are all empty then the looping will be ended
Create a Processor that simply repeats until this processor and all of the other input sources that are passed in are empty or ended.
Create a Processor that simply repeats until this processor and all of the other input sources that are passed in are empty or ended. Each repetition generates an integer that can be used to count the number of repetitions if desired.
other processors to empty (in addition to this) before ending the loop
A Processor containing a sequence of whatever elements are returned by the for-comprehension
repeatUntilEmpty can very easily result in infinite loops because it depends on the following components of the process/workflow correctly retrieving elements from the input source so that it eventually empties
The following examples are ways that one can create infinite loops (or loops that last up to Long.MaxValue):
for { processor1 <- input.bytes.processor processor2 <- input.bytes.processor processor1Loops <- processor1.repeatUntilEmpty() // if processor2 is emptied before processor1 there is an infinite loop because // this section will be the loop and since processor1 is not accessed here we have a loop // to be safer next1 should be in this section processor2Loops <- processor2.repeatUntilEmpty() next1 <- processor1.next.opt next2 <- processor2.next.opt } yield (next1, next2)
for { processor1 <- input.bytes.processor processor2 <- input.bytes.processor processor1Loops <- processor1.repeatUntilEmpty(processor2) next1 <- processor1.next // next.opt should be used here because this can cause // an infinite loop. if processor1 is empty and processor2 is not // next produces an empty processor so the next line will not be executed // next.opt would always produce an non-empty Processor and therefore // should be preferred over next next2 <- processor2.next } yield (next1, next2)
for { processor1 <- input.bytes.processor processor2 <- input.bytes.processor loops <- processor1.repeatUntilEmpty(processor2) if loops < 100 // this guard is dangerous because if there are more than 100 elements in either // processor1 or processor2 there is an infinite loop because next1 and next2 never get called next1 <- processor1.next.opt next2 <- processor2.next.opt } yield (next1, next2)
A safe implementation using repeatUntilEmpty should only execute methods that produce non-empty Processors or should be done with extreme care.
for { processor1 <- input.bytes.processor processor2 <- input.bytes.processor processor1Loops <- processor1.repeatUntilEmpty(processor2) option1 <- processor1.next.opt option2 <- processor2.next.opt next1 <- option1 next2 <- option2 } yield (next1, next2) }}}
Construct a sequence by taking up to i elements from the input source
Construct a sequence by taking up to i elements from the input source
the maximum number of elements to take
a Seq[A] consisting of the elements taken from the input source
Construct a sequence by taking elements from the input source until the function returns false or there are no more elements in the input source.
Construct a sequence by taking elements from the input source until the function returns false or there are no more elements in the input source.
the predicate that determines when to stop taking elements
a Seq[A] consisting of the elements taken from the input source
The Common API for declaring a process workflow on an Input object. This class contains all the operations that can performed. A ProcessorAPI object essentially wraps a data source/resource that provides elements of type A, the methods allow the developer to read and skip over the elements as needed.
Also see the section below about data specific APIs
The processing package contains classes for processing and the ProcessorAPI is the core of the Processing API.
The concept behind the processing API is to declare the format in a largely declarative manner and read the contents as needed. The processing API is largely not needed when only a single input object is to be processed however it could be considered to be slightly easier to read but when multiple input objects are to be processed at the same time and have interdependencies then the processing API becomes indispensable.
There are two commonly used patterns: extract a single item of data from Input objects and extract a collection of data elements from Input objects.
Pattern 1: Read a single item of data from Input objects:
In this use case the format is declared as the for-comprehension and the result is returned by the yield. The val headerProcessor is not executed executed until acquireAndGet is executed, at that point it essentially is the process workflow for reading the headers. The acquireAndGet method executes the process and passes the resulting object to the function argument.
repeatUntilEmpty takes parameters so that it will repeat until a set of ProcessorAPI objects are empty rather just one
Notice the difference between:
and
and
In case 1 the process follow api2.repeatUntilEmpty will be repeated until api2 is empty and if api1 is not empty the process will never end. This is because api1 will repeatUntilEmpty but api2 is empty so it will simply return
In case 2 the process will repeat until both api1 and api2 are empty
In case 3 api2 will be created then the process will repeat until api2 is empty. Then api1 will be checked to see if it is empty. If it is then the api2 will be created again and the process will be repeated until api2 is again empty. etc...
All the normal behaviour of for-comprehensions are supported as well, including guards, pattern matching etc...
In addition to this class, one can wrap this API with a type Specific API for a particular type a. For example support A is bytes, ByteProcessorAPI can be used to gain access to APIs that are specific to working with Bytes.
,when using repeatUntilEmpty() the following lines must consume data from the ProcessorAPI otherwise the loop will never terminate (because it will never be empty)
,There is no method for directly obtaining the result of the process because of the next use-case. As you will see the ProcessorAPI has repeat*** methods that result in Processor[Iterator[A] ], in this case the iterator only works when the resources are open. Because of this all processors must be accessed within a context where they will be correctly closed after accessing.
In short, accessing the result of a Processor is done within the acquireAndGet method (or via traversable, see next section).
Pattern 2: Define a repetitive format and obtain a LongTraversable[A] for processing each element defined by the format:
In this example the entire api1 is processed until it is empty. The value process is of type Process[Iterator[Record] ] and can be accessed by using either acquireAndGet or traversable. acquireAndGet behaves the same way as for Pattern 1 and the iterator is accessed as normal. However the iterator must not escape the acquireAndGet because it will no longer be valid because the resources it depends on will be closed, thus the traversable is often a better option because it returns a LongTraversable and can be used repeatedly without concern about the underlying resources being left open or closed while the traversable continues to be used.
scalax.io.processing.CharProcessorAPI
scalax.io.processing.ProcessorAPI
scalax.io.processing.SpecificApiFactory