- Practical Real-time Data Processing and Analytics
- Shilpi Saxena Saurabh Gupta
- 739字
- 2025-04-04 18:19:00
Apache NiFi
Apache NiFi is the tool to read from the source and distribute data across different types of sinks. There are multiple types of source and sink connectors available. Download NiFi Version 1.1.1 from https://archive.apache.org/dist/nifi/1.1.1/nifi-1.1.1-bin.tar.gz to your local machine. Once the nifi-1.1.1-bin.tar.gz file is downloaded, extract the files:
cp nifi-1.1.1-bin.tar.gz /home/ubuntu/demo cd /home/ubuntu/demo tar -xvf nifi-1.1.1-bin.tar.gz
The following files and folder are extracted, as shown in the following screenshot:

Start NiFi as follows:
/bin/nifi.sh start
NiFi is started in the background. To check whether NiFi is running successfully or not, use:
/bin/nifi.sh status
When NiFi is started, you can access the NiFi UI by accessing the following URL: http://localhost:8080/nifi. The following screenshot shows the UI interface for NiFi:

Now, let's a create flow file in NiFi which will read the file and push each line as an event in the Kafka topic named as nifi-example.
First, create a topic in Kafka using the following command:
/bin/kafka-topics.sh --create --topic nifi-example --zookeeper localhost:2181 --partitions 1 --replication-factor 1
Now, go to the NiFi UI. Select Processor and drag it into the window. It will show all available processors in NiFi. Search for GetFile and select it. It will display in your workspace area as in the following screenshot:

To configure processor, right click on the GetFile processor and select Configure as shown in the following screenshot:

It will give you the flexibility to change all possible configurations related to the processor type. As per the scope of this chapter, let's go to properties directly.
Apply the properties as shown in the following screenshot:

Input Directory is the directory where the logs/files are kept. File Filter is a regular expression to filter out the files in the directory. Let's suppose that the directory has application level logs as well as system level logs and we want to process only application level logs. In this case, the file filter can be used. Path Filter is a filter for sub directories. If log directory has multiple sub directories, then this filter can be used. Batch Size is the maximum number of files that will be fetched in one iteration of the run. If you don't want to delete source files, then set Keep Source File as true. Recurse Subdirectories is the property used whenever we need to scan sub directories in the log directory. If so, then set it to true; otherwise, set it to false. Polling Interval is the time after which the process will look for new files in the log directory. If you want to process hidden files in the log directory, then set Ignore Hidden Files as false.
To read the file we used the GetFile processor, now we want to push each line on the Kafka topic, then use the PutKafka processor. Again, click on processor and drag it into the workspace area.
After the mouse drop, it will ask for the type of processor. Search processor as PutKafka and select it. It will be shown in the following screenshot:

Now, right click on PutKafka and select configure for configuration. Set the configurations as shown in the following screenshot:

Some of the important configurations are Known Brokers, Topic Name, Partition, and Client Name.
You can specify the broker host name along with port number in Known Brokers. Multiple brokers are separated by a comma. Specify the topic name which is created on Kafka broker. Partition is used when a topic is partitioned. Client Name should be any relevant name for the client to make a connection with Kafka.
Now, make a connection between GetFile processor and PutKafka processor. Drag the arrow from GetFile processor and drop to PutKafka processor. It will create a connection between them.
Create a test file in /home/ubuntu/demo/ files and some words or statements, as follows:
hello this
is nifi kafka integration example
Before running NiFi Pipeline, start a process from the console to read from the Kafka topic nifi-example:
/bin/kafka-console-consumer.sh --topic nifi-example --bootstrap-server localhost:9092 --from-beginning
Let's start the NiFi Pipeline which reads from the test file and puts it into Kafka. Go to the NiFi workspace, press select all (Shift + A) and press the Play button from the operate window.
The output is as shown in the following screenshot:

NiFi output is as seen in the following screenshot:
