Platform Explorer / Nuxeo Platform 11.3

Bundle org.nuxeo.runtime.stream

In bundle group org.nuxeo.runtime

Documentation

  • README.md

    nuxeo-runtime-stream

    About

    This module provides an integration of nuxeo-stream with Nuxeo. It adds 2 services:

    • a Kafka configuration service: to register Kafka and Zookeeper access and consumer producer properties.
    • a Stream service: to define LogManager configuration, initialize Log and start StreamProcessor.

    Kafka Configurations

    You can register one or multiple Kafka configurations using this Nuxeo extension point:

    <?xml version="1.0"?>
    <component name="my.project.kafka.contrib">
      <extension target="org.nuxeo.runtime.stream.kafka.service" point="kafkaConfig">
        <kafkaConfig name="default" zkServers="localhost:2181" topicPrefix="nuxeo-">
          <producer>
            <property name="bootstrap.servers">localhost:9092</property>
          </producer>
          <consumer>
            <property name="bootstrap.servers">localhost:9092</property>
            <property name="request.timeout.ms">65000</property>
            <property name="max.poll.interval.ms">60000</property>
            <property name="session.timeout.ms">20000</property>
            <property name="heartbeat.interval.ms">1000</property>
            <property name="max.poll.records">50</property>
          </consumer>
        </kafkaConfig>
      </extension>
    </component>
    

    This Kafka configuration named default can be used in the Log configuration below.

    Make sure you have read the nuxeo-stream README to setup properly Kafka.

    Stream Service

    This service enable to define Log configurations and to register stream processors.

    The Log configuration

    There are 2 types of Log configurations:

    • Chronicle: limited for single node: all producers and consumers must be on the same node.
    • Kafka: required for distributed producers and consumers.

    You can define a Log configuration with the following Nuxeo extension point:

    <?xml version="1.0"?>
    <component name="my.project.stream.log.contrib">
      <extension target="org.nuxeo.runtime.stream.service" point="logConfig">
        <!-- Chronicle impl, default storage under default directory, default retention -->
        <logConfig name="default" />
        <!-- Chronicle impl, storage in /tmp/imp, a week of retention -->
        <logConfig name="import" type="chronicle">
          <option name="directory">imp</option>
          <option name="basePath">/tmp</option>
          <option name="retention">7d</option>
        </logConfig>
        <!-- Chronicle impl, default storage and retention,
             create a Log named myStream with 7 partitions if it does not exist. -->
        <logConfig name="custom">
          <log name="myStream" size="7" />
        </logConfig>
        <!-- Kafka impl, referencing the default Kafka config -->
        <logConfig name="work" type="kafka">
          <option name="kafkaConfig">default</option>
        </logConfig>
        <!-- Kafka impl,
             create a Log named pubSub if it does not exist. -->
        <logConfig name="nuxeo" type="kafka">
          <option name="kafkaConfig">default</option>
          <log name="pubSub" size="1" />
        </logConfig>
    
    
      </extension>
    </component>
    

    The default Log type is Chronicle.

    The default retention for Chronicle is 4 days, this can be changed using theĀ nuxeo.conf option: nuxeo.stream.chronicle.retention.duration.

    The retention value is expressed as a string like: 12h or 7d, respectively for 12 hours and 7 days.

    The default storage for Chronicle is: ${nuxeo.data.dir}/data/stream, this path can be changed using the nuxeo.conf option: nuxeo.stream.chronicle.dir.

    Using Log from Nuxeo

    The Nuxeo Stream service enables to get and share access to LogManager:

      StreamService service = Framework.getService(StreamService.class);
      LogManager manager = service.getLogManager("custom");
      // write a record to myStream, the log exists because it has been initialized by the service
      LogAppender<Record> appender = manager.getAppender("myStream");
      appender.append(key, Record.of(key, value.getBytes()));
    
      // read
      try (LogTailer<Record> tailer = manager.createTailer("myGroup", "myStream")) {
          LogRecord<Record> logRecord = tailer.read(Duration.ofSeconds(1));
          assertEquals(key, logRecord.message().key);
      }
      // don't close the manager, this is done by the service
    

    Stream processing

    It is possible to register stream processors, this way they are initialized and started with Nuxeo.

    The extension point refer to a class that returns the topology of computations, the settings are configurable in the contribution. Also the retry policy can be set to a specific computation or for all using default as policy name, the delay between retries, exponentially backing off to the maxDelay and multiplying successive delays by a 2 factor the maxRetries sets the max number of retries to perform. If continueOnFailure is true then the computation will checkpoint the record in error and continue processing new record. The default policy when unspecified is no retry and abort on failure (continueOnFailure is false).

    <?xml version="1.0"?>
    <component name="my.project.stream.stream.contrib">
      <extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
      <streamProcessor name="myStreamProcessor" logConfig="default" defaultConcurrency="4" defaultPartitions="12"
          class="org.nuxeo.runtime.stream.tests.MyStreamProcessor">
          <stream name="output" partitions="1" />
          <computation name="myComputation" concurrency="8" />
          <policy name="myComputation" continueOnFailure="false" maxRetries="3" delay="500ms" maxDelay="10s" />
        </streamProcessor>
      </extension>
    </component>
    

    Following Project QA Status

    Build Status

    About Nuxeo

    Nuxeo dramatically improves how content-based applications are built, managed and deployed, making customers more agile, innovative and successful. Nuxeo provides a next generation, enterprise ready platform for building traditional and cutting-edge content oriented applications. Combining a powerful application development environment with SaaS-based tools and a modular architecture, the Nuxeo Platform and Products provide clear business value to some of the most recognizable brands including Verizon, Electronic Arts, Sharp, FICO, the U.S. Navy, and Boeing. Nuxeo is headquartered in New York and Paris. More information is available at www.nuxeo.com.

Resolution Order

[783, 792]
The resolution order represents the order in which components have been resolved by the Nuxeo Runtime framework. This range represents the minimal and maximal orders for this bundle's components.
You can influence this order by adding "require" tags in the component declaration, to make sure it is resolved after another component. It will also impact the order in which contributions are registered on their target extension point (see "Registration Order" on contributions).

Components

Maven Artifact

Filenuxeo-runtime-stream-11.3.56.jar
Group Idorg.nuxeo.runtime
Artifact Idnuxeo-runtime-stream
Version11.3.56

Manifest

Manifest-Version: 1.0
Archiver-Version: Plexus Archiver
Created-By: Apache Maven
Built-By: root
Build-Jdk: 11.0.8
Bundle-ManifestVersion: 1
Bundle-Version: 11.3.56-t20201006-172650
Bundle-Name: Nuxeo Runtime Stream
Bundle-SymbolicName: org.nuxeo.runtime.stream;singleton:=true
Bundle-Vendor: Nuxeo
Nuxeo-Component: OSGI-INF/kafka-config-service.xml,OSGI-INF/stream-ser
 vice.xml,OSGI-INF/stream-management-contrib.xml,OSGI-INF/avro-service
 .xml,OSGI-INF/codec-service.xml,OSGI-INF/avro-contrib.xml

Exports