Giter VIP home page Giter VIP logo

stephensorriaux / ansible-kafka-admin Goto Github PK

View Code? Open in Web Editor NEW
141.0 16.0 44.0 576 KB

Manage your topic's configuration (partitions, replication factor, parameters), ACLs, quotas, users and get stats, without any effort with this library. It does not use the Kafka scripts and does not require ssh connection to the remote broker.

License: Apache License 2.0

Python 97.57% Jinja 0.67% Shell 1.75%
kafka kafka-client ansible library kafka-topic python kafka-topics kafka-quotas kafka-acl consumer-groups

ansible-kafka-admin's People

Contributors

bgk avatar dependabot[bot] avatar f30 avatar laurentoliva avatar ryarnyah avatar saiello avatar stephensorriaux avatar teebee avatar vgivanov avatar ymilhi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ansible-kafka-admin's Issues

/admin/reassign_partitions is already there after 5 retries

We are managing several kafka topics in a loop and sometimes we are getting failures with the /admin/reassign_partitions is already there after 5 retries messages.

I dug through the code and have a couple of questions:

  1. This znode is being created in update_admin_assignment with
    self.zk_client.create(self.ZK_REASSIGN_NODE, json_assignment)

  2. This is creating a non-ephemeral znode

  3. I can't see any reference to it being deleted?

However, at some point the znode is being deleted? Where is the znode being deleted?

I did modify the module so that the znode is created with ephemeral=True and for good measure, I delete the znode before closing the zookeeper connection to ensure its deleted in a timely manner and not hanging around when the next iteration of the loop runs.

kafka_topics fails to increase partition count if topic list include both - topics to change and desired ones

Expected Behavior

Partition count changed, no error is thrown

Actual Behavior

Partition count changed, error is thrown:

Unable to initialize Kafka manager: KafkaManagerError: Error while updating topic 'topic_b' partitions. Error key is INVALID_PARTITIONS, Number of partitions is invalid.. Request was CreatePartitionsRequest_v0(topic_partitions=[(topic='topic_b', new_partitions=(count=<built-in method count of tuple object at 0x7fcc18234200>, assignment=[])), (topic='topic_a', new_partitions=(count=<built-in method count of tuple object at 0x7fcc180bdd40>, assignment=[[202, 203, 204]]))], timeout=15000, validate_only=False).

Tasks to Reproduce the Problem

  - name: "manage topics - init"
    kafka_topics:
      api_version: "2.6.0"
      topics:
        - name: topic_a
           partitions: 1
           replica_factor: 3
        - name: topic_b
           partitions: 1
           replica_factor: 3
      zookeeper: "localhost:2181"
      bootstrap_servers: "localhost:9092"

  - name: "manage topics - error"
    kafka_topics:
      api_version: "2.6.0"
      topics:
        - name: topic_a
           partitions: 2
           replica_factor: 3
        - name: topic_b
           partitions: 1
           replica_factor: 3
      zookeeper: "localhost:2181"
      bootstrap_servers: "localhost:9092"

Code with problem

            for _new_partition in range(partitions - old_partition):
                assignment = []
                for _replica in range(total_replica):
                    assignment.append(next(brokers_iterator))
                assignments.append(assignment)
            topics_assignments.append(
                (topic_name, (partitions, assignments))
            )

For the good topics (which do not require any change), the code adds record as well with empty assignments, e.g. from an example above:

(topic='topic_b', new_partitions=(count=<built-in method count of tuple object at 0x7fcc18234200>, assignment=[]

The proposed solution is to add an if partitions > old_partition: for that block.

UnboundLocalError: local variable 'manager' referenced before assignment while using SSL

Expected Behavior

Creating ACL for a Topic

Actual Behavior

Error because of an references unsigned variable in module_utils/kafka_lib_acl.py",

The Problem seems to be a decalared and initialized variable manager in line 51 inside a try block. Maybe declare manager = None outside try would help.

Play to Reproduce the Problem

- name: Create acl
  StephenSorriaux.ansible_kafka_admin.kafka_acl:
    acl_resource_type: 'topic'
    name: 'topic1'
    acl_principal: 'User:Alice'
    acl_operation: 'write'
    acl_permission: 'allow'
    acl_pattern_type: 'literal'
    state: 'present'
    bootstrap_servers: "kafka-headless.kafka.svc:9092"
    security_protocol: 'SSL'
    ssl_cafile: "{{ lookup('ENV', 'KAFKA_CA_CERT') }}"
    ssl_certfile: "{{ lookup('ENV', 'KAFKA_CLIENT_CERT') }}"
    ssl_keyfile: "{{ lookup('ENV', 'KAFKA_CLIENT_KEY') }}"

Logs from the play with Ansible in debug mode

fatal: [localhost]: FAILED! => {"changed": false, "module_stderr": "Traceback (most recent call last):\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_lib_acl.py\", line 51, in process_module_acls\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_lib_commons.py\", line 265, in get_manager_from_params\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_manager.py\", line 85, in __init__\n File \"/opt/ansible/.local/lib/python3.8/site-packages/kafka/client_async.py\", line 244, in __init__\n self.config['api_version'] = self.check_version(timeout=check_timeout)\n File \"/opt/ansible/.local/lib/python3.8/site-packages/kafka/client_async.py\", line 909, in check_version\n version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))\n File \"/opt/ansible/.local/lib/python3.8/site-packages/kafka/conn.py\", line 1238, in check_version\n if not self.connect_blocking(timeout_at - time.time()):\n File \"/opt/ansible/.local/lib/python3.8/site-packages/kafka/conn.py\", line 340, in connect_blocking\n self.connect()\n File \"/opt/ansible/.local/lib/python3.8/site-packages/kafka/conn.py\", line 429, in connect\n if self._try_handshake():\n File \"/opt/ansible/.local/lib/python3.8/site-packages/kafka/conn.py\", line 508, in _try_handshake\n self._sock.do_handshake()\n File \"/usr/lib64/python3.8/ssl.py\", line 1309, in do_handshake\n self._sslobj.do_handshake()\nssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'kafka-headless.kafka.svc'. (_ssl.c:1125)\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_lib_acl.py\", line 170, in process_module_acls\n File \"/usr/lib64/python3.8/traceback.py\", line 167, in format_exc\n return \"\".join(format_exception(*sys.exc_info(), limit=limit, chain=chain))\n File \"/usr/lib64/python3.8/traceback.py\", line 120, in format_exception\n return list(TracebackException(\n File \"/usr/lib64/python3.8/traceback.py\", line 508, in __init__\n self.stack = StackSummary.extract(\n File \"/usr/lib64/python3.8/traceback.py\", line 340, in extract\n if limit >= 0:\nTypeError: '>=' not supported between instances of 'SSLCertVerificationError' and 'int'\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/opt/ansible/.ansible/tmp/ansible-tmp-1642581129.8590431-384-13047009424365/AnsiballZ_kafka_acl.py\", line 102, in <module>\n _ansiballz_main()\n File \"/opt/ansible/.ansible/tmp/ansible-tmp-1642581129.8590431-384-13047009424365/AnsiballZ_kafka_acl.py\", line 94, in _ansiballz_main\n invoke_module(zipped_mod, temp_path, ANSIBALLZ_PARAMS)\n File \"/opt/ansible/.ansible/tmp/ansible-tmp-1642581129.8590431-384-13047009424365/AnsiballZ_kafka_acl.py\", line 40, in invoke_module\n runpy.run_module(mod_name='ansible_collections.StephenSorriaux.ansible_kafka_admin.plugins.modules.kafka_acl', init_globals=None, run_name='__main__', alter_sys=True)\n File \"/usr/lib64/python3.8/runpy.py\", line 207, in run_module\n return _run_module_code(code, init_globals, run_name, mod_spec)\n File \"/usr/lib64/python3.8/runpy.py\", line 97, in _run_module_code\n _run_code(code, mod_globals, init_globals,\n File \"/usr/lib64/python3.8/runpy.py\", line 87, in _run_code\n exec(code, run_globals)\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/modules/kafka_acl.py\", line 142, in <module>\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/modules/kafka_acl.py\", line 138, in main\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_lib_acl.py\", line 36, in process_module_acl\n File \"/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload_dms23v4o/ansible_StephenSorriaux.ansible_kafka_admin.kafka_acl_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_lib_acl.py\", line 174, in process_module_acls\nUnboundLocalError: local variable 'manager' referenced before assignment\n", "module_stdout": "Loading SSL CA from /tmp/cafiler0y4xzdg\nLoading SSL CA from /tmp/cafiler0y4xzdg\nLoading SSL Cert from /tmp/certfileg4lbxg0k\nLoading SSL Cert from /tmp/certfileg4lbxg0k\nLoading SSL Key from /tmp/keyfileqsfnamxh\nLoading SSL Key from /tmp/keyfileqsfnamxh\n<BrokerConnection node_id=bootstrap-0 host=kafka-headless.kafka.svc:9092 <connecting> [IPv4 ('10.245.0.69', 9092)]>: connecting to kafka-headless.kafka.svc:9092 [('10.245.0.69', 9092) IPv4]\n<BrokerConnection node_id=bootstrap-0 host=kafka-headless.kafka.svc:9092 <connecting> [IPv4 ('10.245.0.69', 9092)]>: connecting to kafka-headless.kafka.svc:9092 [('10.245.0.69', 9092) IPv4]\nProbing node bootstrap-0 broker version\nProbing node bootstrap-0 broker version\n", "msg": "MODULE FAILURE\nSee stdout/stderr for the exact error", "rc": 1}

Specifications

  • Library version: 0.15.0
  • Result of pip list command:
  • Kafka version: 2.8.1
  • Python version: 3.8
  • OS: 5.15.12-100.fc34.x86_64

Get unspecified Error Message on module kafka_info for topics

Expected Behavior

Get a List of all topics inside the kafka cluster.

Actual Behavior

Get Error:
fatal: [localhost]: FAILED! => {"changed": false, "msg": "Seomthing went wrong: 'min.insync.replicas' "}

The Error maybe was thrown here:

msg='Seomthing went wrong: %s ' % e

By the way: You have an typo here too

Play to Reproduce the Problem

  1. A kafka cluster deployed via Helm
helm upgrade -i -n kafka kafka bitnami/kafka --set auth.tls.type=pem --set deleteTopicEnable=true --set auth.clientProtocol=tls --set auth.interBrokerProtocol=tls --set auth.tls.existingSecrets[0]=kafka-broker-tls --set replicaCount=1 --set authorizerClassName=kafka.security.authorizer.AclAuthorizer
  1. At least one topic needs to be exist
- name: "TopicClaim: Find an existing Kafka Topic for that Claim"
  StephenSorriaux.ansible_kafka_admin.kafka_info:
    resource: "topic"
    bootstrap_servers: "{{ bootstrap_servers }}"
    security_protocol: 'SSL'
    ssl_cafile: "{{ ssl_cafile }}"
    ssl_certfile: "{{ ssl_certfile }}"
    ssl_keyfile: "{{ ssl_keyfile }}"
  vars:
    bootstrap_servers: "{{ lookup('ENV', 'KAFKA_BOOTSTRAP_SERVERS') }}"
    ssl_cafile: "{{ lookup('ENV', 'KAFKA_CA_CERT') }}"
    ssl_certfile: "{{ lookup('ENV', 'KAFKA_CLIENT_CERT') }}"
    ssl_keyfile: "{{ lookup('ENV', 'KAFKA_CLIENT_KEY') }}"
  register: kafka_topics

Logs from the play with Ansible in debug mode

ANSIBLE_DEBUG=true ansible-playbook my-awesome-playbook.yml

fatal: [localhost]: FAILED! => {"changed": false, "msg": "Seomthing went wrong: 'min.insync.replicas' "}

...task path: /opt/ansible/roles/topic/tasks/main.yml:2\nincluded: /opt/ansible/roles/topic/tasks/deploy.yml for localhost\n\r\nTASK [topic : Topic: Find an existing Kafka Topic for that Claim] **************\r\ntask path: /opt/ansible/roles/topic/tasks/deploy.yml:2\n[WARNING]: Module invocation had junk after the JSON data: <BrokerConnection\r\nnode_id=0 host=kafka-0.kafka-headless.kafka.svc.cluster.local:9092 <connected>\r\n[IPv4 ('10.245.0.9', 9092)]>: Closing connection.\nfatal: [localhost]: FAILED! => {\"changed\": false, \"msg\": \"Seomthing went wrong: 'min.insync.replicas' \"}\n\r\nPLAY RECAP *********************************************************************\r\nlocalhost : ok=1 changed=0 unreachable=0 failed=1 skipped=0 rescued=0 ignored=0 \r\n\n","job":"8768220180229646268","name":"topic1","namespace":"ipl-operator-system","error":"exit status 2"}
The full traceback is:
File "/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_info_payload_c9e18gfv/ansible_StephenSorriaux.ansible_kafka_admin.kafka_info_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/modules/kafka_info.py", line 81, in main
File "/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_info_payload_c9e18gfv/ansible_StephenSorriaux.ansible_kafka_admin.kafka_info_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_manager.py", line 1379, in get_resource return self.resource_to_func[resource]()
File "/tmp/ansible_StephenSorriaux.ansible_kafka_admin.kafka_info_payload_c9e18gfv/ansible_StephenSorriaux.ansible_kafka_admin.kafka_info_payload.zip/ansible_collections/StephenSorriaux/ansible_kafka_admin/plugins/module_utils/kafka_manager.py", line 1195, in get_topics_resource
min_isr = int(topic_config['min.insync.replicas'])
fatal: [localhost]: FAILED! => {
"changed": false,
"invocation": {
"module_args": {
"api_version": "7.0.1",
"bootstrap_servers": "kafka.kafka.svc:9092",
"connections_max_idle_ms": 540000,
"request_timeout_ms": 60000,
"resource": "topic",
"sasl_kerberos_service_name": null,
"sasl_mechanism": "PLAIN",
"sasl_plain_password": null,
"sasl_plain_username": null,
"security_protocol": "SSL",
"ssl_cafile": "-----BEGIN CERTIFICATE-----\nMIIBhDCCASugAwIBAgIRAJzCZyq8UCOzTAMp71SoE8YwCgYIKoZIzj0EAwIwIjEg\nMB4GA1UEAxMXY2x1c3Rlci1pbnRlcm1lZGlhdGUtY2EwHhcNMjIwMTE3MDY0MTQx\nWhcNMjIwNDE3MDY0MTQxWjAiMSAwHgYDVQQDExdjbHVzdGVyLWludGVybWVkaWF0\nZS1jYTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABJyVv/tULyk2247b20HOZd/2\n44mqejLiCY7MEHxISyeGfzVYhNvlUl+SI69wyGEylKSBRHEX0jt9XBAUQJ+EiCij\nQjBAMA4GA1UdDwEB/wQEAwICpDAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRe\nM9YQsPK8Gy/ioQccN3XcTTtkEDAKBggqhkjOPQQDAgNHADBEAiBwh41Cz3rLZ4lg\nuzqhS7BEc14i4+rPGSObUFEbwEQcxwIgBQ3BPWFmuHI5HHM1W9uJFOlQEQ2/jZwT\nOQ85HzjGAIM=\n-----END CERTIFICATE-----\n",
"ssl_certfile": "-----BEGIN CERTIFICATE-----\nMIICwTCCAmegAwIBAgIQKD3Y+JXk8nqpsKLTjxW3ETAKBggqhkjOPQQDAjAiMSAw\nHgYDVQQDExdjbHVzdGVyLWludGVybWVkaWF0ZS1jYTAeFw0yMjAxMTkwODIxMzFa\nFw0yMjA0MTkwODIxMzFaMD4xPDA6BgNVBAMTM2lwbC1vcGVyYXRvci1jb250cm9s\nbGVyLW1hbmFnZXIuaXBsLW9wZXJhdG9yLXN5c3RlbTCCASIwDQYJKoZIhvcNAQEB\nBQADggEPADCCAQoCggEBANjYk1clGK1RFSJ009Rv0zX8ZArXnHabaWouznyippWY\nVu6P9m7R4c+a1baWCa/rTV/4+CgqKrZ5qlFpA1YVvAA47Dz40VJ+d8r9w8Sic9kG\nUr4rsBOA9yg00tMeAUothPHfaI/IIPd1gDMqTEqEWmqQ/lJpv1/COSxNmuUf3jC7\ny+luEN7yW4gUtyytxT4ZGkJcP47guHmJ0KIyzp8fJDXojgJVTK+A067q5/dJCaPv\nKze2GA0oZr/RMe6Q0PUMxQnNWZgfeQs3t4lu4pZJYwL2wqrkmvYhX2/bdY93H911\nNWgiuFozRoIIykFdKnG6HUHS1xygTmqq28zil0MkfP0CAwEAAaOBlzCBlDAOBgNV\nHQ8BAf8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAf\nBgNVHSMEGDAWgBReM9YQsPK8Gy/ioQccN3XcTTtkEDA+BgNVHREENzA1gjNpcGwt\nb3BlcmF0b3ItY29udHJvbGxlci1tYW5hZ2VyLmlwbC1vcGVyYXRvci1zeXN0ZW0w\nCgYIKoZIzj0EAwIDSAAwRQIhAN6P7szDlR5ibJuEPJII/FhV3OJoIbVBzVoAy4zo\n1u5DAiAqRCi8K6px5W4PKQllWvwqBJwkssr1/Tg/9TFkbi9aCA==\n-----END CERTIFICATE-----\n",
"ssl_check_hostname": true,
"ssl_ciphers": null,
"ssl_crlfile": null,
"ssl_keyfile": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
"ssl_password": null,
"ssl_supported_protocols": null
}

Specifications

  • Library version: 0.15.0
  • Result of pip list command:
  • Kafka version: 2.8.1
  • Python version: 3.8
  • OS: 5.15.12-100.fc34.x86_64

Switching CI to Github Actions

Expected Behavior

Travis is beautiful and nice and does not queue the jobs for almost hour.

Actual Behavior

Travis queues the jobs for almost an hour if the job is triggered during a kind of "peak time".

How to fix

It is possible to get an idea of the current number of jobs running on travis via https://www.traviscistatus.com/
However, it seems like some migrations is ongoing from travis-ci.org (which will be closed on december, 31st) to travis-ci.com and that moving to travis-ci.com will not help that much. Following this discussion, I think it is better to just move to Github Actions.

Enhance installation & documentation

I'm not sure if I am misunderstanding how this library is supposed to be used, or if the documentation could be improved?

Why does this install as a Role? How does one use it as a Module/Plugin?

Expected Behavior

  1. Install via ansible-galaxy install -r requirements.yml --roles-path roles
  2. pip install -r requirements.txt
  3. Use the kafka_libs module in playbook tasks from project top level. (Writing playbook tasks that use kafka_libs as the module. Not adding it as a Role in the playbook.)

Actual Behavior

Above would result in

ERROR! couldn't resolve module/action 'kafka_lib'. This often indicates a misspelling, missing collection, or incorrect module path.

If I manually moved/copied the module files, it seemed to work?

  1. mv ./roles/kafka_lib/library/* ./library/
  2. mv ./roles/kafka_lib/module_utils ./plugins/

But that seems like it may not be the best way?

Specifications

  • Library version: 0.10.0
  • Result of pip list command:
  • Kafka version:
  • Python version: Python 2.7.16
  • OS:

kafka 2.1 not work

hello

i would like create topics with kafka 2.1 but is not work he could not connect to broker

Cannot determine a controller for your current Kafka server. Is your Kafka server running and available on '10.210.6.104' with security protocol 'PLAINTEXT'?"

  • name: create topics
    kafka_topic:
    resource: "topic"
    api_version: "2.3.0"
    name: "{{item.name}}"
    partitions: "{{item.partitions}}"
    replica_factor: "{{item.replication_factor}}"
    state: "present"
    zookeeper: "localhost:2181"
    security_protocol: 'PLAINTEXT'
    options:
    retention.ms: "{{item.retention}}"
    min.insync.replicas: "{{item.min_replicas}}"
    flush.messages: "{{item.flush_messages}}"
    bootstrap_servers: "{{ hostvars[inventory_hostname]['ansible_eth0']['ipv4']['address'] }}:9092"
    with_items: "{{kafka_list_topics_pfa}}"

Add tests for Kafka 2.0

Kafka 2.0 is out since 30th of July, we now need to add it to the versions currently tested.

[Module] Create kafka_quotas module

Spec

Add module kafka_quotas to manage quotas on user, ip, ...

Proposed module

kafka_quotas:
  # classic kafka connection options
  ...
  entries:
  - entities:
     - entity_type: user # The entity type
       entity_name: test # The name of the entity, or null if the default
    quotas:
     - key: test # The quota configuration key
       value: 1 # The value to set, otherwise ignored if the value is to be removed
       state: present

[Module] Create kafka_consumer_group module

Spec

It seems that we need some module to manage consumer_groups (commit offset, delete. list & describe may be managed by kafka_info)

Proposed change

No ideas for now => open to suggestion

Upgrade to Kafka API v1

It would be great if we could use the features of the new v1. For example the ACLs resource-pattern-type is a great feature

Setting a PREFIXED ACL to absent has no effect

Expected Behavior

Setting a PREFIXED ACL to absent results in the removal of the ACL entry

Actual Behavior

The playbook runs AND reports that the ACL has been changed, but the ACL is NOT removed

Play to Reproduce the Problem

- hosts: localhost
  gather_facts: false
  vars:
    api_version: "2.3.0"
    security_protocol: "PLAINTEXT"
    zookeeper: "locahost:2181"
    bootstrap_servers: "localhost:9092"
    test_permissions:
      - "describe"
  tasks:
    - name: Set ACL
      vars:
        # set vars for each task
        topic_name: "test.topic.prefix"
        topic_users:
          - "test.topic.user"
      with_nested:
        - "{{topic_name}}"
        - "{{topic_users}}"
        - "{{test_permissions}}"
      kafka_lib:
        # common configuration items
        api_version: "{{api_version}}"
        security_protocol: "{{security_protocol}}"
        zookeeper: "{{zookeeper}}"
        bootstrap_servers: "{{bootstrap_servers}}"
        # task configuration items
        resource: "acl"
        acl_resource_type: "topic"
        name: "{{item.0}}"
        acl_principal: "User:CN={{item.1}}"
        acl_operation: "{{item.2}}"
        acl_permission: "allow"
        acl_pattern_type: "prefixed"
        state: "absent"

Logs from the play with Ansible in debug mode

ansible-playbook 2.8.5
  config file = None
  configured module search path = ['/Users/REDACTED/.ansible/plugins/modules', '/usr/share/ansible/plugins/modules']
  ansible python module location = /usr/local/Cellar/ansible/2.8.5/libexec/lib/python3.7/site-packages/ansible
  executable location = /usr/local/bin/ansible-playbook
  python version = 3.7.7 (default, Mar 10 2020, 15:43:33) [Clang 11.0.0 (clang-1100.0.33.17)]
No config file found; using defaults
setting up inventory plugins
host_list declined parsing /etc/ansible/hosts as it did not pass it's verify_file() method
script declined parsing /etc/ansible/hosts as it did not pass it's verify_file() method
auto declined parsing /etc/ansible/hosts as it did not pass it's verify_file() method
Parsed /etc/ansible/hosts inventory source with ini plugin
Loading callback plugin default of type stdout, v2.0 from /usr/local/Cellar/ansible/2.8.5/libexec/lib/python3.7/site-packages/ansible/plugins/callback/default.py

PLAYBOOK: create_acls.yml ***************************************************************************************************
Positional arguments: create_acls.yml
verbosity: 4
connection: smart
timeout: 10
become_method: sudo
tags: ('all',)
inventory: ('/etc/ansible/hosts',)
forks: 5
1 plays in create_acls.yml

PLAY [localhost] ************************************************************************************************************
META: ran handlers

TASK [Set ACL] **************************************************************************************************************
task path: /Users/REDACTED/data/work/mo/code/_workspace/msk/kafka/env-basic/create_acls.yml:12
<127.0.0.1> ESTABLISH LOCAL CONNECTION FOR USER: REDACTED
<127.0.0.1> EXEC /bin/sh -c 'echo ~REDACTED && sleep 0'
<127.0.0.1> EXEC /bin/sh -c '( umask 77 && mkdir -p "` echo /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174 `" && echo ansible-tmp-1595494951.0280828-163312069815174="` echo /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174 `" ) && sleep 0'
Using module file /Users/REDACTED/.ansible/plugins/modules/ansible-kafka-admin/library/kafka_lib.py
<127.0.0.1> PUT /Users/REDACTED/.ansible/tmp/ansible-local-29952c31bfrig/tmp8km79jpl TO /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174/AnsiballZ_kafka_lib.py
<127.0.0.1> EXEC /bin/sh -c 'chmod u+x /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174/ /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174/AnsiballZ_kafka_lib.py && sleep 0'
<127.0.0.1> EXEC /bin/sh -c '/usr/local/Cellar/ansible/2.8.5/libexec/bin/python3.7 /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174/AnsiballZ_kafka_lib.py && sleep 0'
<127.0.0.1> EXEC /bin/sh -c 'rm -f -r /Users/REDACTED/.ansible/tmp/ansible-tmp-1595494951.0280828-163312069815174/ > /dev/null 2>&1 && sleep 0'
changed: [localhost] => (item=['test.topic.prefix', 'test.topic.user', 'describe']) => {
    "ansible_loop_var": "item",
    "changed": true,
    "invocation": {
        "module_args": {
            "acl_host": "*",
            "acl_operation": "describe",
            "acl_pattern_type": "prefixed",
            "acl_permission": "allow",
            "acl_principal": "User:CN=test.topic.user",
            "acl_resource_type": "topic",
            "api_version": "2.3.0",
            "bootstrap_servers": "localhost:9092",
            "name": "test.topic.prefix",
            "options": null,
            "partitions": 0,
            "replica_factor": 0,
            "resource": "acl",
            "sasl_kerberos_service_name": null,
            "sasl_mechanism": "PLAIN",
            "sasl_plain_password": null,
            "sasl_plain_username": null,
            "security_protocol": "PLAINTEXT",
            "ssl_cafile": null,
            "ssl_certfile": null,
            "ssl_check_hostname": true,
            "ssl_ciphers": null,
            "ssl_crlfile": null,
            "ssl_keyfile": null,
            "ssl_password": null,
            "ssl_supported_protocols": null,
            "state": "absent",
            "zookeeper": "locahost:2181",
            "zookeeper_auth_scheme": "digest",
            "zookeeper_auth_value": "",
            "zookeeper_max_retries": 5,
            "zookeeper_sleep_time": 5,
            "zookeeper_ssl_cafile": null,
            "zookeeper_ssl_certfile": null,
            "zookeeper_ssl_check_hostname": true,
            "zookeeper_ssl_keyfile": null,
            "zookeeper_ssl_password": null
        }
    },
    "item": [
        "test.topic.prefix",
        "test.topic.user",
        "describe"
    ],
    "msg": "acl 'test.topic.prefix': successfully deleted."
}
META: ran handlers
META: ran handlers

PLAY RECAP ******************************************************************************************************************
localhost                  : ok=1    changed=1    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0

Result (no effect):

$ kafka-acls.sh --authorizer-properties zookeeper.connect=$zookeeper --list
Current ACLs for resource `Topic:PREFIXED:test.topic.prefix`:
 	User:CN=test.topic.user has Allow permission for operations: Describe from hosts: *

Specifications

- Library version: latest
- Result of `pip list` command: 
    kafka-python                 1.4.4
    kazoo                        2.6.1
    pure-sasl                    0.5.1
- Kafka version: kafka_2.12-2.3.0.jar
- Python version: 3.7.7
- OS: Darwin computer 19.5.0 Darwin Kernel Version 19.5.0: Thu Apr 30 18:25:59 PDT 2020; root:xnu-6153.121.1~7/RELEASE_X86_64 x86_64. Also seen with RHEL 7.7

Licensing conflict Apache v2 versus GPL v3

Expected Behavior

There are no conflicts between licenses used.

Actual Behavior

The library is licensed under Apache License 2.0, but the kafka_lib.py and kafka_stat_lag.py files have GNU GPL v3.0 copyright in headers.
If the library is a derivative work of a GPL software it can't be licensed under Apache v2 licence.

Create or update of a topic that already exists fails if the connection is SSL

Expected Behavior

Create of a topic that already exists returns ok
Update of a topic that already exists returns changed

Actual Behavior

Play fails and exits with msg:

Error while initializing Zookeeper client : Connection time-out. Is your Zookeeper server available and running on...

Note the playbook runs the first time, but fails the second time. So, this means that you can create topics, but you can't add topics to the playbook and you can't update topics.

Play to Reproduce the Problem

---
- hosts: bastion_branch
  vars:
      zookeeper: 'REMOVED:2181'
      bootstrap_servers: 'REMOVED:9094'
    api_version: '2.2.1'
    security_protocol: 'SSL'
    ssl_supported_protocols: 'TLSv1.2'
    ssl_check_hostname: False
    ssl_certfile: "{{cert_pem}}"
    ssl_keyfile: "{{key_pem}}"
    topics:
      - "{{env}}.{{product}}.topic"
  tasks:
  - name: Create Topic
    with_items: "{{topics}}"
    kafka_lib:
      api_version: "{{api_version}}"
      security_protocol: "{{security_protocol}}"
      ssl_supported_protocols: "{{ssl_supported_protocols}}"
      zookeeper: "{{zookeeper}}"
      zookeeper_ssl_keyfile: "{{ssl_keyfile}}"
      zookeeper_ssl_certfile: "{{ssl_certfile}}"
      zookeeper_ssl_check_hostname: "{{ssl_check_hostname}}"
      bootstrap_servers: "{{bootstrap_servers}}"
      ssl_check_hostname: "{{ssl_check_hostname}}"
      ssl_certfile: "{{ssl_certfile}}"
      ssl_keyfile: "{{ssl_keyfile}}"
      resource: 'topic'
      name: "{{item}}"
      partitions: 1
      replica_factor: 2
      options:
        retention.ms: -1
      state: 'present'

Logs from the play with Ansible in debug mode

On the first run, it works:

changed: [REMOVED.com] => (item=env.product.test) => {
    "ansible_loop_var": "item",
    "changed": true,
    "invocation": {
        "module_args": {
            "acl_host": "*",
            "acl_operation": null,
            "acl_pattern_type": "literal",
            "acl_permission": "allow",
            "acl_principal": null,
            "acl_resource_type": "topic",
            "api_version": "2.2.1",
            "bootstrap_servers": "REMOVED:9094,REMOVED:9094,REMOVED:9094",
            "name": "env.product.test",
            "options": {
                "retention.ms": -1
            },
            "partitions": 1,
            "replica_factor": 2,
            "resource": "topic",
            "sasl_kerberos_service_name": null,
            "sasl_mechanism": "PLAIN",
            "sasl_plain_password": null,
            "sasl_plain_username": null,
            "security_protocol": "SSL",
            "ssl_cafile": null,
            "ssl_certfile": "REMOVED.crt",
            "ssl_check_hostname": false,
            "ssl_ciphers": null,
            "ssl_crlfile": null,
            "ssl_keyfile": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
            "ssl_password": null,
            "ssl_supported_protocols": [
                "TLSv1.2"
            ],
            "state": "present",
            "zookeeper": "REMOVED:2181,REMOVED:2181,REMOVED:2181",
            "zookeeper_auth_scheme": "digest",
            "zookeeper_auth_value": "",
            "zookeeper_max_retries": 5,
            "zookeeper_sleep_time": 5,
            "zookeeper_ssl_cafile": null,
            "zookeeper_ssl_certfile": "REMOVED.crt",
            "zookeeper_ssl_check_hostname": false,
            "zookeeper_ssl_keyfile": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
            "zookeeper_ssl_password": null
        }
    },
    "item": "env.product.test",
    "msg": "topic 'env.product.test': successfully created."

When re-run it fails, it states there is a connection timeout, but that doesn't seem likely:

The full traceback is:
WARNING: The below traceback may *not* be related to the actual failure.
  File "/tmp/ansible_kafka_lib_payload_CzGeY8/__main__.py", line 1758, in main
    verify_certs=zookeeper_ssl_check_hostname
  File "/tmp/ansible_kafka_lib_payload_CzGeY8/__main__.py", line 879, in init_zk_client
    self.zk_client.start()
  File "/home/REMOVED /.local/lib/python2.7/site-packages/kazoo/client.py", line 582, in start
    raise self.handler.timeout_exception("Connection time-out")
failed: [REMOVED.com] (item=env.product.test) => {
    "ansible_loop_var": "item",
    "changed": false,
    "invocation": {
        "module_args": {
            "acl_host": "*",
            "acl_operation": null,
            "acl_pattern_type": "literal",
            "acl_permission": "allow",
            "acl_principal": null,
            "acl_resource_type": "topic",
            "api_version": "2.2.1",
            "bootstrap_servers": "REMOVED:9094,REMOVED:9094,REMOVED:9094",
            "name": "env.product.test",
            "options": {
                "retention.ms": -1
            },
            "partitions": 1,
            "replica_factor": 2,
            "resource": "topic",
            "sasl_kerberos_service_name": null,
            "sasl_mechanism": "PLAIN",
            "sasl_plain_password": null,
            "sasl_plain_username": null,
            "security_protocol": "SSL",
            "ssl_cafile": null,
            "ssl_certfile": "REMOVED.crt",
            "ssl_check_hostname": false,
            "ssl_ciphers": null,
            "ssl_crlfile": null,
            "ssl_keyfile": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
            "ssl_password": null,
            "ssl_supported_protocols": [
                "TLSv1.2"
            ],
            "state": "present",
            "zookeeper": "REMOVED:2181,REMOVED:2181,REMOVED:2181",
            "zookeeper_auth_scheme": "digest",
            "zookeeper_auth_value": "",
            "zookeeper_max_retries": 5,
            "zookeeper_sleep_time": 5,
            "zookeeper_ssl_cafile": null,
            "zookeeper_ssl_certfile": "REMOVED.crt",
            "zookeeper_ssl_check_hostname": false,
            "zookeeper_ssl_keyfile": "VALUE_SPECIFIED_IN_NO_LOG_PARAMETER",
            "zookeeper_ssl_password": null
        }
    },
    "item": "env.product.test",
    "msg": "Error while initializing Zookeeper client : Connection time-out. Is your Zookeeper server available and running on 'REMOVED:2181,REMOVED:2181,REMOVED:2181'?"
}

PLAY RECAP **************************************************************************************************************************************************************************
ansible_python_interpreter=/usr/bin/python : ok=0    changed=0    unreachable=1    failed=0    skipped=0    rescued=0    ignored=0   
REMOVED.com : ok=1    changed=0    unreachable=0    failed=1    skipped=0    rescued=0    ignored=0

Specifications

  • Library version:
  • Result of pip list command:
Package                          Version
-------------------------------- -------
ansible                          2.6.19
awscli                           1.17.6
Babel                            0.9.6
backports.ssl-match-hostname     3.5.0.1
boto                             2.49.0
boto3                            1.10.37
botocore                         1.13.50
cffi                             1.6.0
chardet                          2.2.1
cloud-init                       18.5
colorama                         0.4.1
configobj                        4.7.2
cryptography                     1.7.2
decorator                        3.4.0
docutils                         0.15.2
enum34                           1.0.4
ethtool                          0.8
futures                          3.1.1
httplib2                         0.9.2
idna                             2.4
iniparse                         0.4
ipaddr                           2.1.11
ipaddress                        1.0.16
IPy                              0.75
javapackages                     1.0.0
Jinja2                           2.7.2
jmespath                         0.9.0
jsonpatch                        1.2
jsonpointer                      1.9
kafka-python                     1.4.4
kazoo                            2.6.1
kitchen                          1.1.1
lxml                             3.2.1
M2Crypto                         0.21.1
Magic-file-extensions            0.2
MarkupSafe                       0.11
paramiko                         2.1.1
passlib                          1.6.5
pciutils                         1.7.3
perf                             0.1
pip                              20.0.1
ply                              3.4
policycoreutils-default-encoding 0.1
prettytable                      0.7.2
pure-sasl                        0.5.1
pyasn1                           0.1.9
pycparser                        2.14
pycrypto                         2.6.1
pycurl                           7.19.0
pygobject                        3.22.0
pygpgme                          0.3
pyinotify                        0.9.4
pyliblzma                        0.5.3
pyOpenSSL                        0.13.1
pyserial                         2.6
PySocks                          1.5.7
python-dateutil                  2.8.0
python-dmidecode                 3.10.13
python-linux-procfs              0.4.9
pytoml                           0.1.14
pyudev                           0.15
pyxattr                          0.5.1
PyYAML                           3.10
registries                       0.1
requests                         2.6.0
rhnlib                           2.5.65
rsa                              3.4.2
s3transfer                       0.2.1
schedutils                       0.4
seobject                         0.1
sepolicy                         1.1
setuptools                       0.9.8
shyaml                           0.6.1
six                              1.14.0
slip                             0.4.0
slip.dbus                        0.4.0
subscription-manager             1.24.13
syspurpose                       1.24.13
urlgrabber                       3.10
urllib3                          1.21.1
wheel                            0.33.6
yum-metadata-parser              1.1.4
  • Kafka version: 2.2.1
  • Python version: 2.7.5, 3.7.6 (tried both)
  • OS: Red Hat Enterprise Linux Server release 7.7 (Maipo)

Add a `json_assignment` parameter

When draining some brokers from its topics, it could be useful to have a json_assignment that lets the user specify how to reassign topic partitions in the cluster.

Enabling SASL_SCRAM

Expected Behavior

Accessing Kafka Broker through SASL SCRAM mechanism

Actual Behavior

SASL SCRAM seems not to be available as an option
Kafka Brokers through errors as :
(Unsupported SASL mechanism PLAIN) (org.apache.kafka.common.network.Selector)

Play to Reproduce the Problem

Simply launching the Ansible Module, it loops forever with the error i described

Logs from the play with Ansible in debug mode

There was no error, it loops when trying to connect to broker

Specifications

  • Library version:
  • Result of pip list command:
    Package Version

ansible 2.9.12
certifi 2020.6.20
cffi 1.14.2
chardet 3.0.4
cryptography 3.0
idna 2.10
Jinja2 2.11.2
kafka-python 2.0.2 (i checked from upstream project, scram has been implemented since 2.0.0)
kazoo 2.6.1
lxml 4.5.2
MarkupSafe 1.1.1
pip 20.2.2
pure-sasl 0.5.1
pycparser 2.20
pyOpenSSL 19.1.0
pyvmomi 7.0
PyYAML 5.3.1
requests 2.24.0
setuptools 49.6.0
six 1.15.0
suds-jurko 0.6
urllib3 1.25.10
virtualenv 15.1.0

  • Kafka version: CP-5.5.0 (kafka 2.12)
  • Python version: Python 3.6
  • OS: RHEL 7.6

Do you plan to implement SCRAM mechanism ?
How can i help you ?

Thanks in advance

Consider adding a way to delete consumer groups

We all have some consumer groups that got deprecated and don't have any more members. Deleting those consumer groups would help for Kafka administration (and avoid this huge regex that filters all unused consumer groups).

ImportError: No module named kafka.client

Hi @StephenSorriaux great work that you have done!!
I'm trying to test and include the your module in my setup and I have some issues:

Stacktrace:

$ ansible-playbook deploy-topics.yml -vvv
ansible-playbook 2.6.4
  config file = /Users/imalik/dir/dir/ansible.cfg
  configured module search path = [u'/Users/imalik/.ansible/plugins/modules', u'/usr/share/ansible/plugins/modules']
  ansible python module location = /usr/local/Cellar/ansible/2.6.4/libexec/lib/python2.7/site-packages/ansible
  executable location = /usr/local/bin/ansible-playbook
  python version = 2.7.15 (default, Sep 18 2018, 20:16:18) [GCC 4.2.1 Compatible Apple LLVM 9.1.0 (clang-902.0.39.2)]
Using /Users/USER/dir/dir/ansible.cfg as config file
 [WARNING]: Unable to parse /etc/ansible/hosts as an inventory source

 [WARNING]: No inventory was parsed, only implicit localhost is available

 [WARNING]: provided hosts list is empty, only localhost is available. Note that the implicit localhost does not match 'all'


PLAYBOOK: deploy-topics.yml ************************************************************************************************************
1 plays in deploy-topics.yml

PLAY [127.0.0.1] ***********************************************************************************************************************
META: ran handlers

TASK [kafka_lib : create topic] ********************************************************************************************************
task path: /Users/imalik/code/dir/dir/roles/kafka_lib/tasks/main.yml:2
<127.0.0.1> ESTABLISH LOCAL CONNECTION FOR USER: imalik
<127.0.0.1> EXEC /bin/sh -c 'echo ~imalik && sleep 0'
<127.0.0.1> EXEC /bin/sh -c '( umask 77 && mkdir -p "` echo /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411 `" && echo ansible-tmp-1541780162.6-66828776799411="` echo /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411 `" ) && sleep 0'
Using module file /Users/imalik/code/dir/dir/roles/kafka_lib/library/kafka_lib.py
<127.0.0.1> PUT /Users/imalik/.ansible/tmp/ansible-local-53388kHbcXW/tmp4ZMbAH TO /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411/kafka_lib.py
<127.0.0.1> EXEC /bin/sh -c 'chmod u+x /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411/ /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411/kafka_lib.py && sleep 0'
<127.0.0.1> EXEC /bin/sh -c '/usr/local/bin/python /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411/kafka_lib.py && sleep 0'
<127.0.0.1> EXEC /bin/sh -c 'rm -f -r /Users/imalik/.ansible/tmp/ansible-tmp-1541780162.6-66828776799411/ > /dev/null 2>&1 && sleep 0'
The full traceback is:
Traceback (most recent call last):
  File "/var/folders/yk/h6_k8nrn4_bf50dv02_qkltr0000gn/T/ansible_ae_9UZ/ansible_module_kafka_lib.py", line 199, in <module>
    from kafka.client import KafkaClient
ImportError: No module named kafka.client

fatal: [127.0.0.1]: FAILED! => {
    "changed": false,
    "module_stderr": "Traceback (most recent call last):\n  File \"/var/folders/yk/h6_k8nrn4_bf50dv02_qkltr0000gn/T/ansible_ae_9UZ/ansible_module_kafka_lib.py\", line 199, in <module>\n    from kafka.client import KafkaClient\nImportError: No module named kafka.client\n",
    "module_stdout": "",
    "msg": "MODULE FAILURE",
    "rc": 1
}

PLAY RECAP *****************************************************************************************************************************
127.0.0.1                  : ok=0    changed=0    unreachable=0    failed=1

I've your module in a requirements.yml and run this command ansible-galaxy install -r requirements.yml --roles-path roles cd to that role and pip install -r requirements.txt the libs were installed without an issue.

I have added a tasks and main.yml in the kafka_lib folder an used your example basically creation of a simple topic and it fails with No module xxx found. Do you have any idea or hint to look for?

[Module] Unable to use ssl one way with zookeeper

Expected Behavior

Zk clients should be able to communicate with zookeeper in TLS when zookeeper is not configured to require a client certificate
(zookeeper configuration ssl.clientAuth=none)

Actual Behavior

It is not possible to communicate with zookeeper if no cert/key are configured in client side.
(refer to variable zookeeper_use_ssl)

Play to Reproduce the Problem

ZK configured in TLS without requiring client certificate

Logs from the play with Ansible in debug mode

"Something went wrong: ZookeeperBroken does not take keyword arguments"

Specifications

  • Library version: 0.13.2
  • Result of pip list command:
  • Kafka version: 2.5.1
  • Python version: 2.7
  • OS: RHEL 7.x

Add support for `--producer` and `--consumer` ACL actions

Expected Behavior

I'm not an expert on Kafka ACLs, but it seems that the Kafka commands have shortcuts for adding Principals as either a Producer or Consumer.

bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
 --add --allow-principal User:[email protected] \
 --producer --topic test-topic
bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
 --add --allow-principal User:[email protected] \
 --consumer --topic test-topic --group Group-1

It looks like the --producer option will add Create, Read, Write, and Describe, while --consumer will add Read and Describe?

Actual Behavior

I think the current behavior of kafka_lib is that only a single acl_operation can be added at a time, meaning that multiple tasks would be required to add sufficient permissions for many use-cases (?)

Allowing a single task to specify multiple acl_operation would also be an improvement?

Specifications

  • Library version: 0.10.0
  • Result of pip list command:
  • Kafka version:
  • Python version:
  • OS:

Enhance current tests

This library is currently tested using Molecule but the implemented tests do not cover all the functionnality.

Connect over SSH

Hi,

I was wondering if there is a way to use this over ssh ? My use case is that I have single node kafka brokers, listening only on localhost, and need to manage those.

Get ACLs?

Is it possible to expose get ACL's , like it is possible with getting Topics?

Cannot create topic

Hi - I am getting an error when I try to create a topic - not sure wether it's an actual "Kafka-Error" or more of a python import error

Expected Behavior

- hosts: localhost
  connection: local
  tasks:
    - name: Create list of zookeeper ips
      set_fact:
        zookeeper_ips: "{{ (zookeeper_ips | default([ ]) ) | union([item ~ ':2181']) }}"
      loop: "{{ groups['zookeeper'] }}"
    - name: Create list of kafka ips
      set_fact:
        kafka_ips: "{{ (kafka_ips | default([ ]) ) | union([item ~ ':2181']) }}"
      loop: "{{ groups['kafka_broker'] }}"

    - name: create topic
      kafka_lib:
        resource: "topic"
        api_version: "1.0.1"
        name: "test"
        partitions: 2
        replica_factor: 1
        options:
          retention.ms: 574930
          flush.ms: 12345
        state: "present"
        zookeeper: "{{ zookeeper_ips }}"
        bootstrap_servers: "{{ kafka_ips }}"

Expect a topic "test" to be created

Actual Behavior

TASK [create topic] ***********************************************************************************************************************************************************************************************
task path: /tmp/blub/topics.yml:15
Using module file /tmp/blub/library/kafka_lib.py
Pipelining is enabled.
<127.0.0.1> ESTABLISH LOCAL CONNECTION FOR USER: mosfet
<127.0.0.1> EXEC /bin/sh -c '/usr/bin/python2 && sleep 0'
The full traceback is:
Traceback (most recent call last):
  File "<stdin>", line 102, in <module>
  File "<stdin>", line 94, in _ansiballz_main
  File "<stdin>", line 40, in invoke_module
  File "/usr/lib/python2.7/runpy.py", line 188, in run_module
    fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 82, in _run_module_code
    mod_name, mod_fname, mod_loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/tmp/ansible_kafka_lib_payload_OW01V5/ansible_kafka_lib_payload.zip/ansible/modules/kafka_lib.py", line 21, in <module>
ImportError: No module named kafka.errors
fatal: [localhost]: FAILED! => {
    "changed": false, 
    "module_stderr": "Traceback (most recent call last):\n  File \"<stdin>\", line 102, in <module>\n  File \"<stdin>\", line 94, in _ansiballz_main\n  File \"<stdin>\", line 40, in invoke_module\n  File \"/usr/lib/python2.7/runpy.py\", line 188, in run_module\n    fname, loader, pkg_name)\n  File \"/usr/lib/python2.7/runpy.py\", line 82, in _run_module_code\n    mod_name, mod_fname, mod_loader, pkg_name)\n  File \"/usr/lib/python2.7/runpy.py\", line 72, in _run_code\n    exec code in run_globals\n  File \"/tmp/ansible_kafka_lib_payload_OW01V5/ansible_kafka_lib_payload.zip/ansible/modules/kafka_lib.py\", line 21, in <module>\nImportError: No module named kafka.errors\n", 
    "module_stdout": "", 
    "msg": "MODULE FAILURE\nSee stdout/stderr for the exact error", 
    "rc": 1
}

Specifications

  • Library version:
  • Result of pip list command:
Package                 Version        
----------------------- ---------------
apturl                  0.5.2          
asn1crypto              0.24.0         
blinker                 1.4            
Brlapi                  0.6.7          
certifi                 2018.8.24      
chardet                 3.0.4          
command-not-found       0.3            
cryptography            2.6.1          
cupshelpers             1.0            
dbus-python             1.2.12         
defer                   1.0.6          
distro                  1.3.0          
distro-info             0.21ubuntu4    
entrypoints             0.3            
httplib2                0.11.3         
idna                    2.6            
kafka-python            1.4.4          
kazoo                   2.6.1          
keyring                 18.0.1         
keyrings.alt            3.1.1          
language-selector       0.1            
launchpadlib            1.10.7         
lazr.restfulclient      0.14.2         
lazr.uri                1.0.3          
louis                   3.10.0         
macaroonbakery          1.2.3          
netifaces               0.10.4         
oauthlib                2.1.0          
olefile                 0.46           
pbr                     5.1.1          
pexpect                 4.6.0          
Pillow                  6.1.0          
pip                     18.1           
protobuf                3.6.1          
psutil                  5.5.1          
pure-sasl               0.5.1          
pycairo                 1.16.2         
pycrypto                2.6.1          
pycups                  1.9.73         
Pygments                2.3.1          
PyGObject               3.34.0         
pyinotify               0.9.6          
PyJWT                   1.7.0          
pymacaroons             0.13.0         
PyNaCl                  1.3.0          
pyRFC3339               1.1            
python-apt              1.9.0+ubuntu1.3
python-dateutil         2.7.3          
python-debian           0.1.36         
python-pam              1.8.4          
python-xapp             1.6.0          
python-xlib             0.23           
pytz                    2019.2         
pyxdg                   0.25           
PyYAML                  5.1.2          
reportlab               3.5.23         
requests                2.21.0         
requests-unixsocket     0.1.5          
screen-resolution-extra 0.0.0          
SecretStorage           2.3.1          
setproctitle            1.1.10         
setuptools              41.1.0         
simplejson              3.16.0         
six                     1.12.0         
stevedore               1.31.0         
system-service          0.3            
systemd-python          234            
ubuntu-advantage-tools  19.5           
ubuntu-drivers-common   0.0.0          
ufw                     0.36           
unattended-upgrades     0.1            
urllib3                 1.24.1         
virtualenv              15.1.0         
virtualenv-clone        0.3.0          
virtualenvwrapper       4.8.4          
wadllib                 1.3.3          
wheel                   0.32.3         
xkit                    0.0.0      
  • Kafka version: 2.4.1 (Confluent 5.4.1)
  • Python version: 2.7.17
  • OS: localhost: Ubuntu:19.10, Kafkaserver: Centos8

Error while altering the replication factor and the number of partitions simultanously

Behavior

We encounter a bug (or at least a reproducable race condition) if we alter the replication factor and the number of partitions of a topic simultanously. Altering

topic_partitions: 3
topic_replica_factor: 2
topic_state: present

to

topic_partitions: 10
topic_replica_factor: 3
topic_state: present

results in

fatal: [localhost]: FAILED! => changed=false 
  msg: Error while updating topic 'topic.test' partitions. Error key is UNKNOWN, An unexpected server error.. Request was CreatePartitionsRequest_v0(topic_partitions=[(topic='topic.test', new_partitions=(count=<built-in method count of tuple object at 0x7fbaccf0a188>, assignment=[[102, 100], [101, 102], [100, 101], [102, 100], [101, 102], [100, 101], [102, 100]]))], timeout=15000, validate_only=False).

However, the replication factor has been altered:

kafka-topics --zookeeper ????????:2181 --topic topic.test --describe
Topic:topic.test        PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: topic.test       Partition: 0    Leader: 102     Replicas: 102,100,101   Isr: 102,101,100
        Topic: topic.test       Partition: 1    Leader: 101     Replicas: 102,100,101   Isr: 101,100,102
        Topic: topic.test       Partition: 2    Leader: 100     Replicas: 102,100,101   Isr: 100,102,101

Specifications

  • Library version: 0.8.0
  • Result of pip list command:
$ pip3 list
Package      Version
------------ -------
ansible      2.9.1
asn1crypto   0.24.0
bcrypt       3.1.4
cffi         1.11.5
cryptography 2.4.2
idna         2.7
Jinja2       2.10
jmespath     0.9.4
kafka-python 1.4.4
kazoo        2.6.1
kazoo-sasl   2.6.1
MarkupSafe   1.1.0
paramiko     2.4.2
pip          18.1
pure-sasl    0.6.2
pyasn1       0.4.2
pycparser    2.19
pycrypto     2.6.1
PyNaCl       1.3.0
PyYAML       4.1
setuptools   40.6.2
six          1.12.0
  • Kafka version: 2.3.1
  • Python version:
$ ansible --version
ansible 2.9.1
  config file = None
  configured module search path = ['/root/.ansible/plugins/modules', '/usr/share/ansible/plugins/modules']
  ansible python module location = /usr/lib/python3.6/site-packages/ansible
  executable location = /usr/bin/ansible
  python version = 3.6.9 (default, Oct 17 2019, 11:10:22) [GCC 8.3.0]
  • OS: CentOS 7

Add cluster as resource type

Expected Behavior

I want be able to manage operation DESCRIBE_CONFIGS for resource type cluster
=> acl_resource_type should be expanded to include cluster

Actual Behavior

It is not possible to manage operations for resource type cluster.

REMARK: not every operation supports resource type cluster (see ACL Format and Operations and Resources on Protocols for details)

Add testing for several Python versions

Currently the library is tested with a 2.7.14 version of Python. Since 2.5 version of Ansible, python 3 is officially supported so we need to test also the lib using those versions.

Remove dependency to Zookeeper when possible

Expected Behavior

Usage of KafkaAdminClient when possible instead of accessing Zookeeper nodes directly

Actual Behavior

Need of a Zookeeper connection. We have a Zookeeper with Kerberos which is not supported yet. (And maybe should not if the Kafka Admin Client could make life easier)

Play to Reproduce the Problem

Run create topic code without specifing a ZooKeeper connection

Logs from the play with Ansible in debug mode

fatal: [localhost]: FAILED! => {"changed": false, "msg": "'zookeeper', 'partitions' and 'replica_factor' parameters are needed when parameter 'state' is 'present'"}

Specifications

  • Library version: master
  • Result of pip list command:

ansible (2.4.3.0)
ansible-lint (4.1.0)
anyconfig (0.9.7)
arrow (0.15.5)
atomicwrites (1.3.0)
attrs (19.3.0)
backports.functools-lru-cache (1.6.1)
backports.ssl-match-hostname (3.7.0.1)
bcrypt (3.1.7)
binaryornot (0.4.4)
Cerberus (1.2)
certifi (2019.11.28)
cffi (1.11.1)
chardet (3.0.4)
click (6.7)
click-completion (0.3.1)
colorama (0.3.9)
configparser (4.0.2)
contextlib2 (0.6.0.post1)
cookiecutter (1.6.0)
cryptography (1.4)
distro (1.4.0)
dnspython (1.15.0)
docker-py (1.10.6)
docker-pycreds (0.4.0)
entrypoints (0.3)
enum34 (1.1.6)
fasteners (0.15)
flake8 (3.7.9)
funcsigs (1.0.2)
functools32 (3.2.3.post2)
future (0.18.2)
git-url-parse (1.2.2)
gitdb2 (2.0.6)
GitPython (2.1.14)
httplib2 (0.10.3)
idna (2.7)
importlib (1.0.3)
importlib-metadata (1.4.0)
ipaddress (1.0.22)
Jinja2 (2.10)
jinja2-time (0.2.0)
kafka-python (1.4.4)
kazoo (2.6.1)
MarkupSafe (1.0)
mccabe (0.6.1)
molecule (2.20.0)
monotonic (1.5)
more-itertools (5.0.0)
packaging (20.0)
paramiko (2.1.2)
pathlib2 (2.3.5)
pathspec (0.7.0)
pbr (5.1.1)
pexpect (4.6.0)
pip (9.0.1)
pluggy (0.13.1)
poyo (0.5.0)
psutil (5.4.6)
ptyprocess (0.6.0)
pure-sasl (0.5.1)
py (1.8.1)
pyasn1 (0.4.4)
pycodestyle (2.5.0)
pycparser (2.18)
pycryptodomex (3.4.3)
pyflakes (2.1.1)
PyNaCl (1.3.0)
pyOpenSSL (17.2.0)
pyparsing (2.4.6)
pytest (4.6.9)
pytest-flake8 (1.0.4)
python-ambariclient (0.6.0)
python-dateutil (2.8.1)
python-gilt (1.2.1)
PyYAML (3.12)
requests (2.10.0)
rpm (0.0.2)
ruamel.ordereddict (0.4.14)
ruamel.yaml (0.16.6)
ruamel.yaml.clib (0.2.0)
scandir (1.10.0)
scp (0.10.2)
selinux (0.2.1)
setuptools (44.0.0)
sh (1.12.14)
six (1.11.0)
smmap2 (2.0.5)
tabulate (0.8.2)
testinfra (1.19.0)
tree-format (0.1.2)
typing (3.7.4.1)
urllib3 (1.25.7)
wcwidth (0.1.8)
websocket-client (0.57.0)
wheel (0.29.0)
whichcraft (0.6.1)
yamllint (1.20.0)

  • Kafka version: 2.5.0
  • Python version: 2.7.5
  • OS: Centos 7

Allows pattern usage for topic name

Currently it is only possible to use the name of a topic to change its parameters. It would be great to specify a pattern to apply parameters in "bulk" mode.

[Module] Create kafka_configs module

Spec

A module to manage dynamics configurations

Proposed module

kafka_configs:
  # Classic configuration options
  ...
  configs:
  - resource_type: broker
    resource_name: test
    options:
      key: value

Add Flake8 and pyLint linter for library

Currently, the flake8 linter is only used for the python files in molecule/default/tests folder. Linters should be added for the files in the library folder.

Strange behavior of topic configuration update

Say you define the following options

options:
    retention.ms: 66574936
    flush.ms: 564939

in your playbook.yml and update the topic configuration.

Afterwards you decide to fall back to the global configuration value for flush.ms. So you update your playbook.yml

options:
    retention.ms: 66574936

and perform an update of the topic configuration. Since is_topic_configuration_need_update() checks only the given options it returns False. Therefore update_topic_configuration() will not be called and the value of flush.ms is retained.

Later you modify the retention.ms

options:
    retention.ms: 574936

and perform another update of the topic configuration. Now is_topic_configuration_need_update() returns True and update_topic_configuration() changes the value of retention.ms and resets the value of flush.ms.

Was this behavior intentional?

Unable to get Topic Information when connecting to a Cluster using SSL

Hello,

first of all, I would like to thank you for this brilliant project. We are starting to implement Kafka and want to manage topics through Ansible. That said, this project fits perfectly. However, I just faced an issue, which might already be referenced in #123, but my error is slightly different. Here are the details:

Expected Behavior

run task kafka_info with resource: "topic" and get a list of all available topics

Actual Behavior

Ansible playbook stops with an error:

Additional information

  1. The same playbooks can successfully retrieve the broker and the topic-config through the kafka_info module. It only fails when I want to retrieve the topic information.
  2. The CN of the certificate is a super user of the Kafka Cluster.

Play to Reproduce the Problem

---
- hosts: kafka11t
  remote_user: root
  vars:
    # see roles/cert/default/main.yml for all available
    # parameters
    my_bootstrap_servers: "[kafka11t.example.com:9093](http://kafka11t.example.com:9093/),[kafka12t.example.com:9093](http://kafka12t.example.com:9093/),[kafka13t.example.com:9093](http://kafka13t.example.com:9093/),[kafka14t.example.com:9093](http://kafka14t.example.com:9093/)"
    my_zookeeper_servers: "[kafka11t.example.com:2182](http://kafka11t.example.com:2182/)"
    kafka_api_version: "2.6.0"

  tasks:
    # get information about topics
    - name: get topic information
      StephenSorriaux.ansible_kafka_admin.kafka_info:
        resource: "topic"
        bootstrap_servers: "{{ my_bootstrap_servers }}"
        api_version: "{{ kafka_api_version }}"
        security_protocol: "SSL"
        ssl_cafile: "/etc/pki/ca-trust/source/anchors/MMW-CA-SHA256.cer.crt"
        ssl_certfile: "/etc/pki/tls/certs/{{ ansible_fqdn }}.crt"
        ssl_keyfile: "/etc/pki/tls/private/{{ ansible_fqdn }}.pem"
      changed_when: false
      register: topics_info

    - name: "Display information for topic jh03"
      debug:
        var: topics_info['ansible_module_results']['jh03']
...

## Logs from the play with Ansible in debug mode

`ANSIBLE_DEBUG=true ansible-playbook my-awesome-playbook.yml`

[WARNING]: Module invocation had junk after the JSON data: <BrokerConnection node_id=2 host=kafka12t.example.com:9093 [IPv4 ('10.100.210.63', 9093)]>: Closing connection. <BrokerConnection node_id=4
host=kafka14t.example.com:9093 [IPv4 ('10.100.210.72', 9093)]>: Closing connection.
fatal: [kafka11t]: FAILED! => {"changed": false, "msg": "Error while getting topic from Kafka: KafkaManagerError: Connection is not ready, please check your client and server configurations. "}


## Specifications

  - Library version: 0.15.3
  - Result of `pip list` command: 
  - Kafka version:  Confluent Platform 7.0.1
  - Python version: 3.6.8
  - OS: Linux - RHEL 8.4 on Ansible Node

Output variable name difference for lib kafka_info is not optimal

Hello,

For the kafka_info lib, this behavior is far from being optimal

Depending on Ansible version, results will be saved under the results or ansible_module_results key (named <results_key> in below examples).

It should be the same whatever version of Ansible we use, and actually, the second name, for the most recents ansible version, is not very user-friendly

What about modifying it ? A simple change in this line to whatever_variable_name=results would do the trick, for the variable we could call info or something else

module.exit_json(changed=True, results=results)

If you're ok with it, I can make a PR, for that and the doc (but would be a breaking change)

kafka-python >=1.4.5 not working

The latest versions of kafka-python changed how initial connections are made. Not sure of the "correct" way to resolve this, but I was able to force a connection by adding a call to manager.client.check_version() and manager.client.poll() before the resource/state if block

I believe its this PR that introduced the issue dpkp/kafka-python#1736

Unnecessary requirement for acl_operation in module kafka_acls

Expected Behavior

Delete all ACLs for a specific Topic

Actual Behavior

acl_operation is a required attributes for kafka_acls module which prevent to delete all ACLs for a specific Topic

Play to Reproduce the Problem

- name: Delete all ACLs for a Topic in Kafka
  StephenSorriaux.ansible_kafka_admin.kafka_acl.kafka_acls:
    bootstrap_servers: "{{ lookup('ENV', 'KAFKA_BOOTSTRAP_SERVERS') }}"
    acls:
      - acl_resource_type: 'topic'
        name: 'test'
        state: 'absent'

Logs from the play with Ansible in debug mode

acl_operation is required

This error is explicitly thrown here but in general not declared as an required attribute. I think its an unnecessary line of code.

module.fail_json(msg="acl_operation is required")

Kafka has not that requirement.

Specifications

  • Library version: 0.15.1
  • Result of pip list command:
  • Kafka version: 2.8.1
  • Python version: 3.8
  • OS: 5.15.12-100.fc34.x86_64

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.