Collaborate Kafka and Azure Functions securely within closed network
Published Jun 08 2021 09:19 PM 5,462 Views
Microsoft

1. Introduction

Azure has offered options for Apache Kafka as PaaS - "Event Hubs for Kafka" or Kafka clusters on HDInsight. Event Hub option is more cloud managed than HDInsight one, so HDInsight option is user managed than Event Hub one. It's required to create "Producer" sending data to Kafka and "Consumer" retrieving data from Kafka for both options.
In this post, you can acquire knowledge how to develop scalable Kafka Producer by using Azure Functions.

 

2. Architecture 

Here is a sample architecture that Kafka Producer on Azure Functions sends data to Kafka clusters on HDInsight. HDInsight offers REST Proxy as option, and you can access Kafka endpoints on Azure VNETs by using the REST Proxy. Your application can securely access Kafka on HDInsight via subnets on Azure VNETs by setup VNET Integration on Azure Functions and connection with VNETs.

daisami_0-1623208728321.png

 

3. Setup environment

Create a VNET and subnets for this sample architecture. You should adjust resource names and IP address spaces for your environments.

 

 

export MyResourceGroup="myRG"
export vnetName="myVnet"
export VnetPrefix="10.10.0.0/16"
export FunctionsSubnet="Functions-Subnet"
export FunctionsSubnetPrefix="10.10.10.0/24"
export KafkaSubnet="Kafka-Subnet"
export KafkaSubnetPrefix="10.10.20.0/24"
export location="japaneast"

# Create Reource Group
az group create --name $MyResourceGroup --location $location

# Create VNET with a Subnet for Functions. More than /24 address range is required for VNet Integration
az network vnet create -g $MyResourceGroup -n $vnetName --address-prefix $VnetPrefix \
    --subnet-name $FunctionsSubnet --subnet-prefix $FunctionsSubnetPrefix

# Create Subnet for Kafka on HDInsight
az network vnet subnet create -g $MyResourceGroup --vnet-name $vnetName -n $KafkaSubnet \
    --address-prefixes $KafkaSubnetPrefix

 

 

Create Azure Functions with Standard or Premium plan to utilize VNET Integration. We have chosen Premium (EP1) in this sample.

 

 

# Function app and storage account names must be unique.
export storageName=functionstor$RANDOM
export functionAppName=functionkafkaproducer
export premiumplanName=myappspremplan

# Create an azure storage account
az storage account create \
  --name $storageName \
  --location $location \
  --resource-group $MyResourceGroup \
  --sku Standard_LRS

# Create a Premium plan
az functionapp plan create \
  --name $premiumplanName \
  --resource-group $MyResourceGroup \
  --location $location \
  --sku EP1

# Create a Function App
az functionapp create \
  --name $functionAppName \
  --storage-account $storageName \
  --plan $premiumplanName \
  --resource-group $MyResourceGroup \
  --runtime java \
  --functions-version 3

 

 

Create HDInsight Kafka cluster on a subnet which you have already created by using --subnet option. This option works well if you will deploy Kafka cluster on a subnet within same resource group, but you have to specify subnet ID to deploy on a subnet in different resource group.

 

 

export clusterName="mykafkacluster"
export storageAccount="myhdikafkastor1"
export httpPassword='<Your Cluster Admin Password>'
export sshPassword='<Your SSH Password>'

export storageContainer=$(echo $clusterName | tr "[:upper:]" "[:lower:]")
export workernodeCount=3
export clusterType=kafka
export clusterVersion=4.0
export componentVersion=kafka=2.1
export subnet="H"

# Create storage account
az storage account create \
    --name $storageAccount \
    --resource-group $resourceGroupName \
    --https-only true \
    --kind StorageV2 \
    --location $location \
    --sku Standard_LRS

# Export primary key of Storage Account
export storageAccountKey=$(az storage account keys list \
    --account-name $storageAccount \
    --resource-group $resourceGroupName \
    --query [0].value -o tsv)

# Create blob container
az storage container create \
    --name $storageContainer \
    --account-key $storageAccountKey \
    --account-name $storageAccount

# Create HDInsight Kafka clusterName
az hdinsight create \
    --name $clusterName \
    --resource-group $resourceGroupName \
    --type $clusterType \
    --component-version $componentVersion \
    --http-password $httpPassword \
    --http-user admin \
    --location $location \
    --ssh-password $sshPassword \
    --ssh-user sshuser \
    --storage-account $storageAccount \
    --storage-account-key $storageAccountKey \
    --storage-container $storageContainer \
    --version $clusterVersion \
    --workernode-count $workernodeCount \
    --workernode-data-disks-per-node 2 \
    --vnet $vnetName \
    --subnet $KafkaSubnet

 

 

Setup VNET Integration on Azure Functions. Open your Azure Functions resource, Choose "Networking" and click "Click here to confiture" under VNet Integration.

daisami_1-1623210306195.png

 

Choose VNET and subnet to connect by following portal.

daisami_2-1623210325411.png

 

As next, here is important tip. It's not possible to access Kafka Brokers from Azure Functions within VNETs as default, so we have to setup environment variables like below.

Name Value  
WEBSITE_DNS_SERVER 168.63.129.16 work with Azure DNS private zones
WEBSITE_VNET_ROUTE_ALL 1 route all of your outbound traffic into VNETs

 

Refer screenshot below for this setup.

 

daisami_2-1623210865442.png

 

Now, we can develop Kafka Producer for Azure Functions. Add Maven dependency to use Kafka client like below. We have setup 2.1.0 version as same with deployed HDInsight Kafka version.

Add texts below to properties in pom.xml.

 

 

<properties>
    <kafka.version>2.1.0</kafka.version>
</properties>

 

 

Add texts below to dependency in pom.xml. 

 

 

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>

 

 

Here is code sample for Producer. This is a simple sample which sends a message to a Kafka cluster per request. You should adjust this code for your environment to improve performance or others. Note that build environment have to resolve Kafka Broker name space. 

 

 

package com.function;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.HttpStatus;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import java.util.Optional;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.TopicDescription;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.Properties;
import java.util.Random;
import java.io.IOException;

/**
 * Azure Functions with HTTP Trigger.
 */
public class Function {
    /**
     * This function listens at endpoint "/api/FunctionKafkaProducerJava?". Two ways to invoke it using "curl" command in bash:
     * $ curl -XPOST {your host}/api/FunctionKafkaProducerJava?name={topicName} -d 'JSONBODY'
     */

    @FunctionName("FunctionKafkaProducerJava")
    public HttpResponseMessage run(
            @HttpTrigger(
                name = "req",
                methods = {HttpMethod.GET, HttpMethod.POST},
                authLevel = AuthorizationLevel.FUNCTION)
                HttpRequestMessage<Optional<String>> request,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");

        // Parse query parameter
        final String topicName = request.getQueryParameters().get("name");
        final String reqBody = request.getBody().orElse("");

        // Set properties used to configure the producer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "<KAFKABROKERS>");
        // Set how to serialize key/value pairs
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        // Create Kafka Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // Send message to Kafka
        try
        {
            producer.send(new ProducerRecord<String, String>(topicName, reqBody)).get();            
        }
        catch (Exception e)
        {
            System.out.print(e.getMessage());
        }

        // Close Kafka producer
        producer.flush();
        producer.close();
        return request.createResponseBuilder(HttpStatus.OK).body(reqBody).build();
    }
}

 

 

As an example to improve performance, you can change sample codes above by sending messages as a list at once. This sample can send multiple messages with Kafka Producer connection as just one post data.

 

 

        // Create Kafka Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // Send message to Kafka
        String jsonLine;
        for (int i=0; i < reqBodyArray.length; i++) {
            try {
                producer.send(new ProducerRecord<String, String>(topicName, reqBodyArray[i])).get();
            }
            catch (Exception e) {
                System.out.print(e.getMessage());
            }
        }
        // Close Kafka producer
        producer.flush();
        producer.close();

 

 

Azure Functions in this sample uses HTTP Trigger, so you can send sample data as HTTP post requests. 

 

 

curl -XPOST {your host}/api/FunctionKafkaProducerJava?name={topicName} -d '{<JSONBODY>}'

 

 

 

Co-Authors
Version history
Last update:
‎Jun 09 2021 08:39 AM
Updated by: