Giter VIP home page Giter VIP logo

apache-pulsar-cheat-sheet's Introduction

Cheat Sheet for Apache Pulsar πŸš€

Use Pulsar in Docker

$ docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:latest \
  bin/pulsar standalone

Basic CLI Commands

Create a tenant

$ pulsar-admin tenants create demo

Create a namespace

$ pulsar-admin namespaces create demo/tests

Create a partitioned-topic with defaults tenant/namespace (default/public)

$ pulsar-admin topics create-partitioned-topic cheat_sheet_topic --partitions 3

Note: by default a topic is persistent, i.e. messages are persisted to storage nodes (bookies)

Create a partitioned-topic for a specific tenant and namespace.

$ pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/cheat_sheet_topic --partitions 3

List partitioned-topics

$ pulsar-admin topics list-partitioned-topics public/default

Get stats about a topic

$ pulsar-admin topics stats persistent://public/default/cheat_sheet_topic

Produce messages

$ pulsar-client produce cheat_sheet_topic --messages "first message, second message, third message"

Consume messages

$ pulsar-client consume -s "my-subscription" cheat_sheet_topic -n 0 -p Earliest

Reset cursor to a specific time

$ pulsar-admin persistent reset-cursor persistent://public/default/cheat_sheet_topic -s my-subscription --time '1d'

Client API

Dependencies

Java (using Maven)

<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>

Client: Create basic PulsarClient

Java

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://broker:6650")
    .build();

Producer: Create basic Producer

Java

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("cheat_sheet_topic")
    .create();

Producer: Create Producer with RoundRobin Routing Mode

Java

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("cheat_sheet_topic")
    .hashingScheme(HashingScheme.Murmur3_32Hash) // default is Java.hashCode()
    .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
    .create();

Producer: Create Producer with Single Partition Routing Mode

If no key is provided on the message, the producer will randomly pick one single partition and publish all the messages into that partition.

Java

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("cheat_sheet_topic")
    .hashingScheme(HashingScheme.Murmur3_32Hash) // default is Java.hashCode()
    .messageRoutingMode(MessageRoutingMode.SinglePartition)
    .create();

Producer: Create Producer with Custom Router

Java

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("cheat_sheet_topic")
    .messageRoutingMode(MessageRoutingMode.CustomPartition)
    .messageRouter(new MessageRouter() {
        @Override
        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
            String key = msg.getProperty("routing_key");
            return MathUtils.signSafeMod(Murmur3_32Hash.getInstance().makeHash(key), metadata.numPartitions());
        }
    })
    .create();

Producer: Create Batching Producer

Java

Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("cheat_sheet_topic")
    .enableBatching(true)
    .batchingMaxBytes(5 * 1024 * 1024) // 5MB
    .batchingMaxPublishDelay(200, TimeUnit.MILLISECONDS)
    .blockIfQueueFull(true)
    .sendTimeout(30, TimeUnit.SECONDS)
    .compressionType(CompressionType.ZSTD)
    .batcherBuilder(BatcherBuilder.KEY_BASED)
    .hashingScheme(HashingScheme.Murmur3_32Hash)
    .create();

Consumer: Create a durable Consumer with Key Shared subscription

Java

try(Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("cheat_sheet_topic")
    .subscriptionName("cheatSeetsubscription")
    .subscriptionMode(SubscriptionMode.Durable)
    .subscriptionType(SubscriptionType.Key_Shared)
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
    .subscribe()
) {

    while (true) {
        Message<String> message = consumer.receive();
        try {
            System.out.printf(
                "Message received: key=%s, value=%s, topic=%s, id=%s%n",
                message.getKey(),
                message.getValue(),
                message.getTopicName(),
                message.getMessageId().toString());
            consumer.acknowledge(message);
        } catch (Exception e) {
            // Failed to process message, mark it for redelivery
            consumer.negativeAcknowledge(message);
        }
    }
}

Consumer: Read a partitioned-topic from the beginning to last published message.

Java

// Create a PulsarClient
PulsarClient client = ...

// List all partitions for topic
List<String> topics = client.getPartitionsForTopic("test_hello").get();

// Create as many readers as topic-partitions
List<CompletableFuture<Reader<String>>> readers = topics.stream()
    .map(topic ->
        client.newReader(Schema.STRING)
            .topic(topic)
            .startMessageId(MessageId.earliest)
            .createAsync()
    ).collect(Collectors.toList());

// Create a fixed-sized Thread pool.
ExecutorService service = Executors.newFixedThreadPool(readers.size());

// Submit one task for each reader
for (CompletableFuture<Reader<String>> future : readers) {
    service.submit(() -> {
        try (Reader<String> reader = future.get()) {
            while (reader.hasMessageAvailable()) {
                Message<String> message = reader.readNext();
                System.out.printf(
                    "Message received: key=%s, value=%s, topic=%s, id=%s%n",
                    message.getKey(),
                    message.getValue(),
                    message.getTopicName(),
                    message.getMessageId().toString());
            }
            System.err.printf("[%s]No message available for topic %s %n",
                Thread.currentThread().getName(),
                reader.getTopic());
        } catch (IOException ignore) {
        } catch (Exception e) {
           throw new RuntimeException("Cannot get reader", e);
        }
    });
}
service.shutdown();
service.awaitTermination(5, TimeUnit.MINUTES);
client.close();

Consumer: Reset consumer subscription to either Earliest or Latest

Java

public void resetSubscriptionOffsetsTo(final Consumer<?> consumer,
                                       final SubscriptionInitialPosition strategy) throws PulsarClientException {
    Objects.requireNonNull(consumer, "consumer cannot be null");
    Objects.requireNonNull(strategy, "strategy cannot be null");
    System.out.printf(
        "Resetting partition %s for subscription %s to %s position %n",
        consumer.getTopic(),
        consumer.getSubscription(),
        strategy
    );
    consumer.seek(strategy == SubscriptionInitialPosition.Earliest ? MessageId.earliest : MessageId.latest);
}

Note: this operation can only be done on non-partitioned topics.

Admin API

Dependencies

Java (using Maven)

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-admin</artifactId>
    <version>${pulsar.version}</version>
</dependency>

Admin: Create basic PulsarAdmin

Java

PulsarAdmin admin = PulsarAdmin
        .builder()
        .serviceHttpUrl("http://localhost:8080")
        .build();

Admin: Check if topic exists

Java

public boolean topicExists(final PulsarAdmin admin,
                           final String topicName) throws PulsarAdminException {
    int partitionNum = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
    if (partitionNum == 0) {
        try {
            admin.topics().getStats(topicName);
        } catch (PulsarAdminException.NotFoundException e) {
            return false;
        }
    }
    return true;
}

Admin: Check if topic is partitioned

Java

public boolean isTopicPartitioned(final PulsarAdmin admin, final String topicName) throws PulsarAdminException {
    return admin.topics().getPartitionedTopicMetadata(topicName).partitions > 0;
}

Admin: Create topic

Java

public void createTopic(final PulsarAdmin admin,
                        final String topicName,
                        final int defaultPartitionNum) throws PulsarAdminException {
    if (defaultPartitionNum > 0)
        admin.topics().createPartitionedTopic(topicName, defaultPartitionNum);
    else
        admin.topics().createNonPartitionedTopic(topicName);
}

Admin: Delete a topic

Java

public void deleteTopic(final PulsarAdmin admin, final String topicName) throws PulsarAdminException {
    if (isTopicPartitioned(admin, topic))
        admin.topics().deletePartitionedTopic(topicName, true);
    else
        admin.topics().delete(topicName, true);
}

License

Copyright 2020 StreamThoughts.

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

apache-pulsar-cheat-sheet's People

Contributors

fhussonnois avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.