How to Integrate Azure OpenAI Batch Processing into Your Java Application

This blog post shows you how to integrate Azure OpenAI batch processing into your Java application. The Spring Framework has come a long way in the last 18 months or so. The new addition of Spring AI has made it relatively straight forward for Java Spring developers to integrate large language models into their applications opening the possibility for RAG pattern (Retrieval Augmented Generation) applications and a great deal more besides. The AI industry as a whole seems to have relentless forward momentum right now and the infrastructure around it is also bringing more to the table.

One such thing is the batch processing offered by Azure AI Services. Given the cost of AI generation, it seems only logical that model providers begin to offer a way to make things more affordable and processing data asynchronously and in bulk is one such solution available in the Microsoft Azure cloud. Spring AI does not yet have an implementation for batch processing however it is still possible to integrate it into your Java apps using the available rest endpoints provided by Azure.

Note: In this blog post I will assume that you have already deployed an AI model in the Azure AI Services cloud. I will also assume you already have experience in connecting that to your Java application. If not, then take a look at my previous blog post which describes exactly that here.

1. A brief overview

Azure AI Services will accept a .jsonl file for batch processing and provide another for download when the processing is complete. They aim to process files within 24 hours however they don’t expire files which take longer. They provide a number of rest endpoints for this but first, let’s take a look at the process…

2. Preparing you data for processing

The first step will be creating the .jsonl file with the data you want to process. Azure accepts the following json format:

				
					{
  "custom_id": "1",
  "method": "POST",
  "url": "/chat/completions",
  "body": {
    "model": "gpt-4o-batch",
    "messages": [
      {
        "role": "system",
        "content": "You are an AI assistant that helps people find information."
      },
      {
        "role": "user",
        "content": "What is Fortran?"
      }
    ]
  }
},
{
  "custom_id": "2",
  "method": "POST",
  "url": "/chat/completions",
  "body": {
    "model": "gpt-4o-batch",
    "messages": [
      {
        "role": "system",
        "content": "You are an AI assistant that helps people find information."
      },
      {
        "role": "user",
        "content": "Name a hairy dog."
      }
    ]
  }
}

				
			

The „custom_id“ property is an id that we define so that we are able uniquely identify the generated response. Given that we are likely processing data from a database in our application, it makes sense that we use the primary key of the data to be processed so that we can easily associate the processed data with the original data.

The „url“ property shown above is the actual URL that should be used here. It seems odd that we do not use the full URL of our Azure resource but it would seem the URL that we have here is simply appended to the URL of our resource internally by Azure.

In the „body“ object we can see the name of the model – „model“: „gpt-4o-batch“, this is the name of our deployed model. It is important to note that we need to deploy a model specifically for the purpose of batch processing, we cannot use models that we use for regular chat completions.

There are two main types of model available, Global, and DataZone. The difference is the location in which the data is processed. The uploaded file is always stored in the location of the deployed model however Global models may process your data anywhere in the world where there is capacity. This could be a problem if you need to stay compliant with GDPR regulations. In this case it is better to deploy a DataZone model which processes the data in the location it was uploaded. For developers working with European businesses and organisations this will be the better way to go and will help establish trust in the services being used.

We then see an array of „messages“. These are our system and user messages. The system message primes the behaviour of the AI model, and the user message is the data we want to process. To prepare our data for processing we will need to first create some records to work with the methods we will create. We will also need a dto to hold the String data to be processed and the „custom_id“ of our data.

				
					record JsonlTask(String custom_id, String method, String url, JsonlTaskBody body) {
}

record JsonlTaskBody(String model, List messages) {
}

record Message(String role, String content) {
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DataToProcess {
    private Long caseId;
    private String data;
}

				
			

The Message data type In the JsonlTaskBody above is a Spring AI data type. Next we will create a method to write the .jsonl file and helper method to prepare our data. The method writeJsonlFile creates the .jsonl, stores it as a temp file, and returns the file path so that we can find it later to upload the file to Azure AI Services. It is good practice to delete this file once it has been successfully submitted to prevent data piling up in the temp folder. This method calls the helper method which creates the json objects for each of our DataToProcess dtos.

				
					public Path writeJsonlFile(List dataToProcess) throws IOException {
    ObjectMapper mapper = new ObjectMapper();
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);

    Path tempFilePath = Files.createTempFile("data_", ".jsonl");

    try (BufferedWriter writer = new BufferedWriter(new FileWriter(tempFilePath.toFile()))) {
        for (DataToProcess data : dataToProcess) {
            JsonlTask task = createJsonlTask(dataToProcess);
            writer.write(mapper.writeValueAsString(task));
            writer.newLine();
        }
    }

    return tempFilePath;
}


private JsonlTask createJsonlTask(DataToProcess data) throws IOException {
    String userMessage = data.getData();
    JsonlTaskBody body = new JsonlTaskBody(
            batchDeploymentName,
            List.of(
                    new Message("system", prompt.getContentAsString(StandardCharsets.UTF_8)),
                    new Message("user", userMessage)
            )
    );
    return new JsonlTask(dto.getCaseId().toString(),
    "/chat/completions", "POST", body);
}

				
			

The system prompt is a .st text file stored in the resources folder of the application.

3. Submitting your data for processing

Now that we have prepared our data for processing, we can submit it to Azure AI Services for processing. Let’s create a method which sends and HTTP request with our data and returns the file_id which Azure AI Services creates for our uploaded file.

Our method will have parameters for the API URL of the file upload endpoint:  https://YOUR_RESOURCE_NAME.openai.azure.com/openai/files?api-version=2024-10-21as well as the API key of your Azure AI Services resource and the file path of the .jsonl file to be processed.

				
					public String uploadFile(String apiUrl, String apiKey, Path filePath) {
    RestTemplate restTemplate = new RestTemplate();
    ObjectMapper objectMapper = new ObjectMapper();

    try {
        // Ensure the file exists
        if (!Files.exists(filePath)) {
            throw new RuntimeException("File not found at path: " + filePath);
        }

        // Read file bytes and attach them
        byte[] fileBytes = Files.readAllBytes(filePath);

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.MULTIPART_FORM_DATA);
        headers.set("api-key", apiKey);

        // Create multipart body
        MultiValueMap body = new LinkedMultiValueMap();
        body.add("purpose", "batch");
        body.add("file", new ByteArrayResource(fileBytes) {
            @Override
            public String getFilename() {
                return filePath.getFileName().toString();
            }
        });

        HttpEntity; requestEntity = new HttpEntity(body, headers);

        // Send request
        ResponseEntity responseEntity = restTemplate.exchange(apiUrl, HttpMethod.POST, requestEntity, String.class);

        // Parse response to extract "id"
        JsonNode jsonNode = objectMapper.readTree(responseEntity.getBody());
        return jsonNode.get("id").asText();

    } catch (Exception e) {
        throw new RuntimeException("Failed to upload file: " + e.getMessage(), e);
    }
}

				
			

Once we have uploaded our file for processing, Azure Ai Services will validate it to make sure it conforms to the required format. If anything is not right, an error message is returned detailing what went wrong. Below is an example. The „errors“ property is a small part of a larger json object which is returned and shows what went wrong:

				
					"errors": {
   "object": “list”,
   "data": 
    [
      {
         “code”: “empty_file”,
         “message”: “The input file is empty. Please ensure that the batch contains at least one request.”
      }
    ]
}

				
			

The file_id is returned immediately with one of the following statuses:

pending,
processed,
in_progress,
validating,
finalizing,
completed,
failed,
expired,
cancelling,
cancelled

4. Checking the status of the uploaded file

Once the file_id has been returned we can check the status of the uploaded file. The method we create will call the endpoint for the status. When the status is returned, if it is not „processed“ then the method will wait a minute before checking again. We also include a 30 minute timeout so that if „processed“ is not returned within 30 minutes then the method returns false and an exception is thrown.

				
					public boolean checkFileStatus(String apiUrlTemplate, String apiKey, String fileId) {
    String apiUrl = apiUrlTemplate.replace("{file-id}", fileId);

    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    RestTemplate restTemplate = new RestTemplate();
    ObjectMapper objectMapper = new ObjectMapper();

    CompletableFuture result = new CompletableFuture();

    Runnable task = () -> {
        try {
            HttpHeaders headers = new HttpHeaders();
            headers.set("api-key", apiKey);

            HttpEntity requestEntity = new HttpEntity(headers);
            String response = restTemplate.exchange(apiUrl, HttpMethod.GET, requestEntity, String.class).getBody();

            JsonNode responseJson = objectMapper.readTree(response);
            String status = responseJson.get("status").asText();

            log.info("File Status: " + status);

            if ("processed".equalsIgnoreCase(status)) {
                log.info("File processing completed.");
                result.complete(true);
                executor.shutdown();
            }
        } catch (Exception e) {
            log.error("Error checking file status: {}", e.getMessage());
            result.complete(false);
            executor.shutdown();
        }
    };

    executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.MINUTES);

    try {
        return result.get(30, TimeUnit.MINUTES);
    } catch (Exception e) {
        log.error("Timeout or error while waiting for file status: {}", e.getMessage());
        executor.shutdownNow();
        return false;
    }
}

				
			

The method simply returns true when the status is „processed“ so that the following code knows it is now time to start the batch process. At first glance, the use of ScheduledExecutorService seems to  indicate that this method is asynchronous, however, it is ultimately blocking dut to the result.get(…). This is in fact what we want since when we have all of the methods we need we will call sequentially them from a scheduled task.

Let’s assume our file upload was successfully processed. Our Java application should now associate the returned file_id with the data which has been sent for processing. This could mean a column on the database table which holds the data being processed, so that each data point being processed has the file_id, or perhaps a new table which holds the file_id which we reference with a 1-N relationship from the original table. How you handle this will depend or your specific application architecture so I won’t go into more details here.

5. Starting the batch process

We now know that our jsonl. file has been accepted by Azure Ai Services so we can make the API call to start the process. The following method takes the API URL and API key as parameters, as well as the file_id we now have saved. It returns the id Azure AI Services assign to this specific batch process which we can use to check on the status later and eventually download the processed data. You will need to save this in the same way you did with the file_id earlier.

The headers we create for the API call contain the property completion_window. This must be set to 24h or the process will fail. Perhaps in future Azure may offer additional completion windows but for we can only set a value of 24h here. The method waits for a successful (2xx) HTTP response then returns the id of the batch process. In the event that it is not successful we throw an exception which gives us some feedback as to what went wrong.

				
					public String startBatchJob(String apiUrl, String apiKey, String inputFileId) {
    RestTemplate restTemplate = new RestTemplate();
    ObjectMapper objectMapper = new ObjectMapper(); // For parsing JSON

    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.APPLICATION_JSON);
    headers.set("api-key", apiKey);

    String requestBody = String.format(
            "{ \"input_file_id\": \"%s\", \"endpoint\": \"/chat/completions\", \"completion_window\": \"24h\" }",
            inputFileId
    );

    HttpEntity requestEntity = new HttpEntity(requestBody, headers);

    try {
        ResponseEntity responseEntity = restTemplate.exchange(apiUrl, HttpMethod.POST, requestEntity, String.class);

        if (responseEntity.getStatusCode().is2xxSuccessful()) {
            String responseBody = responseEntity.getBody();
            JsonNode jsonNode = objectMapper.readTree(responseBody);

            // Extract and return the "id" property
            if (jsonNode.has("id")) {
                String id = jsonNode.get("id").asText();
                log.info("Batch Job Started Successfully. ID: {}", id);
                return id;
            } else {
                log.error("Response does not contain 'id': {}", responseBody);
                return "-1";
            }
        } else {
            log.error("Batch Job Start Failed with Status: {}", responseEntity.getStatusCode());
            return "-1";
        }
    } catch (Exception e) {
        log.error("Failed to start batch job: {}", e.getMessage());
        return "-1";
    }
}

				
			

6. Calling the methods to start processing batches

Now that we have put together the methods and objects that we need, let’s create a batch job which will run the process. The execute method below calls the methods we have created so far but there are 2 additional method calls, saveFileId and saveBatchId. You will need to create your own implementations of these which work with your existing codebase.

				
					@Transactional
public void execute() throws IOException {
// Replace method below with your own method which fetches your data and 
// puts it into the dto
    List dataToProcess = dataService.getDataForProcessing();
  
    Path tempFilePath = writeJsonlFile(dataToProcess);
    String returnedFileId = uploadFile(uploadFileApiUrl, apiKey, tempFilePath);
    if (checkFileStatus(fileStatusApiUrl, apiKey, returnedFileId)) {
	// create your own saveFileId implementation
        saveFileId(returnedFileId, dataToProcess);
        String batchId = startBatchJob(startFileProcessingApiUrl, apiKey, returnedFileId);
	// create your own saveBatchId implementation
        saveBatchId(batchId, dataToProcess);
    }
    Files.deleteIfExists(tempFilePath);
}

				
			

Assuming the batch process was started successfully we now play a waiting game until our data is ready to be downloaded. That means making an API call to check for completed batches, then downloading the file. Although Azure AI Services offer a completion window of 24 hours it has been my experience that the process is finished significantly faster than that, often between 10 and 15 minutes after the process is started. For this reason it is a good idea to run a scheduled task which check for completed batch processes every 5 minutes. It may also be the case that you are uploading data to be processed multiple times a day and so by continually checking for finished batches you get the results pretty much as soon as they are ready.

7. Checking for completed batches

With the following method we make an API call which returns us a list of file_ids of the completed batches. Our request has query parameters for start time and end time, which we pass into our method as parameters. This is because we only want the file_ids of batches completed since we last checked and we’re checking every five minutes.

				
					public List getCompletedBatchOutputFileIds(long startTime, long endTime) throws IOException, InterruptedException {
    String query = "?api-version=2024-10-01-preview" +
            "&$filter=created_at%20gt%20" + startTime +
            "%20and%20created_at%20lt%20" + endTime +
            "%20and%20status%20eq%20'Completed'" +
            "&$orderby=created_at%20asc";

    HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(fetchListOfBatchesApiUrl + query))
            .header("api-key", apiKey)
            .GET()
            .build();

    HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

    ObjectMapper objectMapper = new ObjectMapper();
    JsonNode root = objectMapper.readTree(response.body());

    List outputFileIds = new ArrayList();
    JsonNode dataArray = root.path("data");

    for (JsonNode batch : dataArray) {
        String status = batch.path("status").asText();
        if ("completed".equalsIgnoreCase(status)) {
            String outputFileId = batch.path("output_file_id").asText();
            outputFileIds.add(outputFileId);
        }
    }

    return outputFileIds;
}

				
			

8. Fetching the processed data

Once we have one or more file_ids of completed batches we can fetch the data. When the file is downloaded we can then parse the json data to extract the AI generated response for use in our application. Included in the json object for each generation is the „custom_id“ property which we set as the primary key of the data being processed. With this we can associate the processed data with the original data very easily and persist or further work with the data in our application. I have added an example of doing something with the data in this method however, the method does return the json data if you prefer to work with it in a separate method.

				
					public List fetchProcessedFile(String apiUrlTemplate, String apiKey, String outputFileId) {

    String apiUrl = apiUrlTemplate.replace("{output_file_id}", outputFileId);

    RestTemplate restTemplate = new RestTemplate();
    ObjectMapper objectMapper = new ObjectMapper();

    try {
        HttpHeaders headers = new HttpHeaders();
        headers.set("api-key", apiKey);

        HttpEntity requestEntity = new HttpEntity(headers);
        ResponseEntity responseEntity = restTemplate.exchange(apiUrl, HttpMethod.GET, requestEntity, byte[].class);

        if (responseEntity.getStatusCode().is2xxSuccessful()) {
            byte[] fileContent = responseEntity.getBody();

            Path tempFilePath = Files.createTempFile("batch_output", ".jsonl");
            Files.write(tempFilePath, fileContent);

            log.info("File successfully fetched and saved to temp folder: " + tempFilePath.toAbsolutePath());

            List extractedData = new ArrayList();
            try (BufferedReader reader = Files.newBufferedReader(tempFilePath)) {
                String line;
                while ((line = reader.readLine()) != null) {
                    JsonNode jsonNode = objectMapper.readTree(line);
                    extractedData.add(jsonNode);
                }
            }

            extractedData.forEach(jsonNode -> {
                Long id = jsonNode.get("custom_id").asLong();
                JsonNode responseBody = jsonNode.path("response").path("body");
                String processedData = responseBody
                        .path("choices")
                        .get(0)
                        .path("message")
                        .path("content")
                        .asText();
		
	// here is where you can implement your own logic for the 
	// for the processed data
                YourObject yourObject = yourObjectService.findById(id);
                yourObject.doSomething(processedData);
                yourObjectService.updateYourObject(yourObject);
            });

            log.info("Extracted data size: " + extractedData.size());
            return extractedData;
        } else {
            throw new RuntimeException("Failed to fetch file. HTTP Status: " + responseEntity.getStatusCode());
        }
    } catch (Exception e) {
        log.error("Error fetching processed file or extracting data: {}", e.getMessage());
        throw new RuntimeException("Error fetching processed file or extracting data", e);
    }
}

				
			

9. A batch job to fetch the data

If we set up a batch job scheduled to run every 5 minutes then the following execute() method can be used to fetch the processed data.

				
					@Transactional
public void execute() throws IOException, InterruptedException {

    Instant start = Instant.now().minus(5, ChronoUnit.MINUTES);
    Instant end = Instant.now();
    long startTime = start.getEpochSecond();
    long endTime = end.getEpochSecond();
    List outputfileIds = getCompletedBatchOutputFileIds(startTime, endTime);
    if (!outputfileIds.isEmpty()) {
        outputfileIds.forEach(id -> {
            fetchProcessedFile(fetchProcessedFileApiUrl, apiKey, id);
        });
    }
}

				
			

We will need to add the following annotated variables for the URL and API key.

				
					@Value("${your-app.ai.batch.fetch-processed-file}")
private String fetchProcessedFileApiUrl;

@Value("${spring.ai.azure.openai.api-key}")
private String apiKey;

				
			

10. What if something goes wrong?

Our implementation for fetching the file_ids of completed batches does not take into consideration that a batch may have failed, it simply looks for batches which have been successfully processed so that we can fetch the data. Azure AI Services provide a number of additional endpoints for checking the status of a particular batch as well as cancelling batch processing among others. Passing in the batch_id as a parameter to the following method we are able to call an API that will return the status of a specific batch.

				
					public static String fetchBatchDetails(String resourceName, String batchId, String apiKey) throws Exception {
    String url = String.format("https://%s.openai.azure.com/openai/batches/%s?api-version=2024-10-21",
            resourceName, batchId);

    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(url))
            .header("api-key", apiKey)
            .GET()
            .build();

    HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString());

    if (response.statusCode() == 200) {
        return response.body();
    } else {
        throw new RuntimeException("Failed to fetch batch details: " + response.statusCode() + " " + response.body());
    }
}

				
			

The returned json is in the following format:

				
					{
  "cancelled_at": null,
  "cancelling_at": null,
  "completed_at": null,
  "completion_window": "24h",
  "created_at": "2024-07-19T17:33:29.1619286+00:00",
  "error_file_id": null,
  "expired_at": null,
  "expires_at": "2024-07-20T17:33:29.1578141+00:00",
  "failed_at": null,
  "finalizing_at": null,
  "id": "batch_e0a7ee28-82c4-46a2-a3a0-c13b3c4e390b",
  "in_progress_at": null,
  "input_file_id": "file-c55ec4e859d54738a313d767718a2ac5",
  "errors": null,
  "metadata": null,
  "object": "batch",
  "output_file_id": null,
  "request_counts": {
    "total": null,
    "completed": null,
    "failed": null
  },
  "status": "Validating"
}

				
			

In this example the status is „Validating“.

11. Canceling a batch

In the event you decide to cancel a batch you can use the following method. Its structure is very similar to the last in terms of the parameters the method takes.

				
					public static String cancelBatch(String resourceName, String batchId, String apiKey) throws Exception {
    String url = String.format("https://%s.openai.azure.com/openai/batches/%s/cancel?api-version=2024-10-21",
            resourceName, batchId);

    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(url))
            .header("api-key", apiKey)
            .POST(HttpRequest.BodyPublishers.noBody()) // Empty body for the POST request
            .build();

    HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString());

    if (response.statusCode() == 200) {
        return response.body();
    } else {
        throw new RuntimeException("Failed to cancel batch: " + response.statusCode() + " " + response.body());
    }
}

				
			

The status of the cancelled batch will be cancelling for approximately 10 minutes and then the status will be cancelled. The response to this request will contain the batch file id and the file is still downloadable as it may contain partial results.

12. A problem solved

This workflow solved a specific problem for me in developing a legal case summarisation feature for a project here at CIIT Software. Cost was a major concern when considering an AI solution but summarising a case could potentially balance that by speeding up the workflow of the end users. By finding a way to use the batch processing offered by Azure AI Services without relying on Spring I have been able to approximately half the cost of AI generation. Batch processing is great if you don’t need an immediate response as you would with AI chat.

Nginx Cache für WordPress

In der Welt des Webhostings ist die Geschwindigkeit ein entscheidender Faktor für den Erfolg einer Website. Hier kommt der Nginx-Cache ins Spiel, insbesondere für WordPress-Websites.

Weiterlesen »

Virtuelle Threads in Java 21

Virtuelle Threads in Java: Ein Paradigmenwechsel in der Concurrent Programmierung   Einführung in Virtuelle Threads Mit der Einführung von virtuellen Threads in Java (auch als

Weiterlesen »