How to transform a log4j message to fit an avro schema and post to kafka

89
July 05, 2022, at 06:00 AM

I am working on a system that sends all logs for all microservices to a single topic apache kafka. Most services are in python but we are now forwarding logs from a Streams app. All other services use the same Schema defined in avro and managed by confluent's Schema Registry. I can get data posting to kafka fine as a string but cannot figure out how to upload a valid avro object linked to a schema registry schema. I am currently attempting to do this via a custom log4j plugin. For testing purposes I am writing these logs to their own topic and reading them out using kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n', but I get

ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating

when doing this (that kcat command does work for my actual service logs topic and all other topics that use valid avro). Originally I tried using the org.apache.avro.generic.GenericData.Record class but could not figure out how to make it work in the methods toSerializable and toByteArray required by the AbstractLayout Interface since that class does not implement the serializable class. Below is the Plugin, class definition, log4j config

ServiceLogLayout.java

@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    DatumWriter<GenericData.Record> serviceLogDatumWriter;
    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
        this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
    }
    @Override
    public byte[] toByteArray(LogEvent event) {
        LOGGER.warn("toByteArray");
        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
        
        // SERIALIZE
        byte[] data = new byte[0];
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Encoder jsonEncoder = null;
        try {
            jsonEncoder = EncoderFactory.get().jsonEncoder(
                    this.record, stream);
            this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
            jsonEncoder.flush();
            data = stream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Serialization error:" + e.getMessage());
        }
        return data;
    }
    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }
    @Override
    public String getContentType() {
        return null;
    }
    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }
    private static class PrivateObjectOutputStream extends ObjectOutputStream {
        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }
        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }
}

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <ServiceLogLayout />
        </Console>
        <Kafka name="Kafka" topic="new_logs">
            <ServiceLogLayout />
            <Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="Kafka"/>
        </Root>
        <Logger name="org.apache.kafka" level="WARN"/>
    </Loggers>
</Configuration>
Answer 1

OneCricketeer had the right idea, here is the implementation:

public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    SchemaRegistryClient client;
    Schema.Parser parser;
    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");
        Schema.Field data = new Schema.Field("data", SchemaBuilder.builder().nullable().stringType(), "Optional extra data, such as stack frames");
        Schema.Field timestamp = new Schema.Field("timestamp", SchemaBuilder.builder().type(timestampMilliType));
        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        fields.add(data);
        fields.add(timestamp);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
        client = new CachedSchemaRegistryClient("http://schema-registry:8081", 10000);
        parser = new Schema.Parser();
    }
    @Override
    public byte[] toByteArray(LogEvent event) {
        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
        schemaBuilder.set("data", null);
        schemaBuilder.set("timestamp", event.getTimeMillis());
        // SERIALIZE
        byte[] data;
        KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(client);
        data = kafkaAvroSerializer.serialize("service_logs", schemaBuilder.build());
        return data;
    }
    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }
    @Override
    public String getContentType() {
        return null;
    }
    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }
    private static class PrivateObjectOutputStream extends ObjectOutputStream {
        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }
        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }
}

It is worth noting that using logstash might be a good solution to this as well

Rent Charter Buses Company
READ ALSO
Unable to parse input date containing abbreviated asian components using DateTimeFormatter

Unable to parse input date containing abbreviated asian components using DateTimeFormatter

I'm trying to parse a date string to LocalDate or LocalDateTime (actually any Temporal object), which contains asian abbreviated partsWhen I provide a valid abbreviated chinese component(in example below - dayOfMonth), I get DateTimeParseException

83
Optimize multiple nested for loops? [closed]

Optimize multiple nested for loops? [closed]

Want to improve this question? Add details and clarify the problem by editing this post

112
Priority Queue - Order Messed Up

Priority Queue - Order Messed Up

I'm having a weird issue where my priority queue prints the last created item first, and then prints everything else in order

66
jMonkeyEngine dependencies with Gradle

jMonkeyEngine dependencies with Gradle

I am currently trying to implement a game written in java using jMonkey Graphic engineMy question is if it is possibile to load all the dependencies needed through Gradle, avoiding all *

102