admin commited on
Commit
1bb1c3c
·
1 Parent(s): be296fa

replace again

Browse files
Files changed (4) hide show
  1. .gitignore +1 -1
  2. app.py +62 -142
  3. requirements.txt +0 -1
  4. utils.py +0 -33
.gitignore CHANGED
@@ -1,3 +1,3 @@
1
- rename.sh
2
  test.*
3
  *__pycache__*
 
1
+ *.gif
2
  test.*
3
  *__pycache__*
app.py CHANGED
@@ -1,23 +1,15 @@
1
- import os
2
- import imghdr
3
- import hashlib
4
- import exifread
5
- import gradio as gr
6
  import pandas as pd
7
- from PIL import Image
8
- from utils import clean_dir, compress, mk_dir, unzip, TMP_DIR, EN_US
9
 
10
  ZH2EN = {
11
- "单图片处理": "Process single picture",
12
- "上传图片": "Upload picture",
13
- "导出原格式": "Export original format",
14
- "下载清理 EXIF 后的图片": "Download cleaned picture",
15
- "批量处理": "Batch processor",
16
- "上传包含多图片的 zip 压缩包 (确保上传进度至 100% 后再提交)": "Upload pictures zip (please ensure the zip is completely uploaded before clicking submit)",
17
- "导出原格式": "Export original format",
18
- "下载清理 EXIF 后的多图片压缩包": "Download cleaned pictures",
19
- "EXIF 列表": "EXIF list",
20
  "状态栏": "Status",
 
 
21
  }
22
 
23
 
@@ -25,147 +17,75 @@ def _L(zh_txt: str):
25
  return ZH2EN[zh_txt] if EN_US else zh_txt
26
 
27
 
28
- def get_exif(origin_file_path):
29
- with open(origin_file_path, "rb") as image_file:
30
- tags = exifread.process_file(image_file)
31
-
32
- output = ""
33
- for key in tags.keys():
34
- value = str(tags[key])
35
- output += "{0}:{1}\n".format(key, value)
36
-
37
- return output
38
-
39
 
40
- def clear_exif(img_path: str, cache: str, img_mode=None, outdir=""):
41
- save_path = f"{cache}/{outdir}output." + img_path.split(".")[-1]
42
- img = Image.open(img_path)
43
- data = list(img.getdata())
44
- if img_mode:
45
- save_path = f"{cache}/{outdir}{hashlib.md5(img_path.encode()).hexdigest()}.jpg"
46
- else:
47
- img_mode = img.mode
48
 
49
- img_without_exif = Image.new(img_mode, img.size)
50
- img_without_exif.putdata(data)
51
- img_without_exif.save(save_path)
52
- return save_path
 
53
 
 
 
 
 
 
 
 
 
 
 
54
 
55
- def find_images(dir_path: str):
56
- found_images = []
57
- for root, _, files in os.walk(dir_path):
58
- for file in files:
59
- fpath = os.path.join(root, file).replace("\\", "/")
60
- if imghdr.what(fpath) != None:
61
- found_images.append(fpath)
62
-
63
- return found_images
64
 
65
 
66
  # outer func
67
- def infer(img_path: str, keep_ext: bool, cache=f"{TMP_DIR}/exif"):
 
68
  status = "Success"
69
- out_img = out_exif = None
70
  try:
71
- if not img_path or imghdr.what(img_path) == None:
72
- raise ValueError("请输入图片!")
73
-
74
  clean_dir(cache)
75
- img_mode = "RGB" if not keep_ext else None
76
- out_img = clear_exif(img_path, cache, img_mode)
77
- out_exif = get_exif(img_path)
 
78
 
79
- except Exception as e:
80
- status = f"{e}"
81
-
82
- return status, out_img, out_exif
83
-
84
-
85
- # outer func
86
- def batch_infer(imgs_zip: str, keep_ext: bool, cache=f"{TMP_DIR}/exif"):
87
- status = "Success"
88
- out_images = out_exifs = None
89
- try:
90
- if not imgs_zip:
91
- raise ValueError("Please upload pictures zip!")
92
-
93
- clean_dir(cache)
94
- mk_dir(f"{cache}/outputs")
95
- extract_to = f"{cache}/inputs"
96
- unzip(imgs_zip, extract_to)
97
- imgs = find_images(extract_to)
98
- img_mode = "RGB" if not keep_ext else None
99
- exifs = []
100
- for img in imgs:
101
- clear_exif(img, cache, img_mode, "outputs/")
102
- exifs.append(
103
- {"filename": os.path.basename(img), "exif": get_exif(img)})
104
-
105
- if not exifs:
106
- raise ValueError("No picture in the zip")
107
-
108
- out_images = f"{cache}/outputs.zip"
109
- compress(f"{cache}/outputs", out_images)
110
- out_exifs = pd.DataFrame(exifs)
111
 
112
  except Exception as e:
113
  status = f"{e}"
114
 
115
- return status, out_images, out_exifs
116
 
117
 
118
  if __name__ == "__main__":
119
- with gr.Blocks() as iface:
120
- with gr.Tab(_L("单图片处理")):
121
- gr.Interface(
122
- fn=infer,
123
- inputs=[
124
- gr.File(
125
- label=_L("上传图片"),
126
- file_types=["image"],
127
- ),
128
- gr.Checkbox(
129
- label=_L("导出原格式"),
130
- value=False,
131
- ),
132
- ],
133
- outputs=[
134
- gr.Textbox(label=_L("状态栏"), show_copy_button=True),
135
- gr.Image(
136
- label=_L("下载清理 EXIF 后的图片"),
137
- type="filepath",
138
- show_share_button=False,
139
- ),
140
- gr.Textbox(label="EXIF", show_copy_button=True),
141
- ],
142
- flagging_mode="never",
143
- )
144
-
145
- with gr.Tab(_L("批量处理")):
146
- gr.Interface(
147
- fn=batch_infer,
148
- inputs=[
149
- gr.File(
150
- label=_L(
151
- "上传包含多图片的 zip 压缩包 (确保上传进度至 100% 后再提交)"
152
- ),
153
- file_types=[".zip"],
154
- ),
155
- gr.Checkbox(
156
- label=_L("导出原格式"),
157
- value=False,
158
- ),
159
- ],
160
- outputs=[
161
- gr.Textbox(label=_L("状态栏"), show_copy_button=True),
162
- gr.File(
163
- label=_L("下载清理 EXIF 后的多图片压缩包"),
164
- type="filepath",
165
- ),
166
- gr.Dataframe(label=_L("EXIF 列表")),
167
- ],
168
- flagging_mode="never",
169
- )
170
-
171
- iface.launch()
 
1
+ import csv
2
+ import random
 
 
 
3
  import pandas as pd
4
+ import gradio as gr
5
+ from utils import clean_dir, TMP_DIR, EN_US
6
 
7
  ZH2EN = {
8
+ "输入参与者数量": "Number of participants",
9
+ "输入分组比率 (格式为用:隔开的数字,生成随机分组数据)": "Grouping ratio (numbers separated by : to generate randomized controlled trial)",
 
 
 
 
 
 
 
10
  "状态栏": "Status",
11
+ "下载随机分组数据 CSV": "Download data CSV",
12
+ "随机分组数据预览": "Data preview",
13
  }
14
 
15
 
 
17
  return ZH2EN[zh_txt] if EN_US else zh_txt
18
 
19
 
20
+ def list_to_csv(list_of_dicts: list, filename: str):
21
+ keys = dict(list_of_dicts[0]).keys()
22
+ # 将列表中的字典写入 CSV 文件
23
+ with open(filename, "w", newline="", encoding="utf-8") as csvfile:
24
+ writer = csv.DictWriter(csvfile, fieldnames=keys)
25
+ writer.writeheader()
26
+ for data in list_of_dicts:
27
+ writer.writerow(data)
 
 
 
28
 
 
 
 
 
 
 
 
 
29
 
30
+ def random_allocate(participants: int, ratio: list, out_csv: str):
31
+ splits = [0]
32
+ total = sum(ratio)
33
+ for i, r in enumerate(ratio):
34
+ splits.append(splits[i] + int(1.0 * r / total * participants))
35
 
36
+ splits[-1] = participants
37
+ partist = list(range(1, participants + 1))
38
+ random.shuffle(partist)
39
+ allocation = []
40
+ groups = len(ratio)
41
+ for i in range(groups):
42
+ start = splits[i]
43
+ end = splits[i + 1]
44
+ for participant in partist[start:end]:
45
+ allocation.append({"id": participant, "group": i + 1})
46
 
47
+ sorted_data = sorted(allocation, key=lambda x: x["id"])
48
+ list_to_csv(sorted_data, out_csv)
49
+ return out_csv, pd.DataFrame(sorted_data)
 
 
 
 
 
 
50
 
51
 
52
  # outer func
53
+ def infer(participants: float, ratios: str, cache=f"{TMP_DIR}/rct"):
54
+ ratio = []
55
  status = "Success"
56
+ out_csv = previews = None
57
  try:
58
+ ratio_list = ratios.split(":")
 
 
59
  clean_dir(cache)
60
+ for r in ratio_list:
61
+ current_ratio = float(r.strip())
62
+ if current_ratio > 0:
63
+ ratio.append(current_ratio)
64
 
65
+ out_csv, previews = random_allocate(
66
+ int(participants), ratio, f"{cache}/output.csv"
67
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
  except Exception as e:
70
  status = f"{e}"
71
 
72
+ return status, out_csv, previews
73
 
74
 
75
  if __name__ == "__main__":
76
+ gr.Interface(
77
+ fn=infer,
78
+ inputs=[
79
+ gr.Number(label=_L("输入参与者数量"), value=10),
80
+ gr.Textbox(
81
+ label=_L("输入分组比率 (格式为用:隔开的数字,生成随机分组数据)"),
82
+ value="8:1:1",
83
+ ),
84
+ ],
85
+ outputs=[
86
+ gr.Textbox(label=_L("状态栏"), show_copy_button=True),
87
+ gr.File(label=_L("下载随机分组数据 CSV")),
88
+ gr.Dataframe(label=_L("随机分组数据预览")),
89
+ ],
90
+ flagging_mode="never",
91
+ ).launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
requirements.txt DELETED
@@ -1 +0,0 @@
1
- exifread
 
 
utils.py CHANGED
@@ -1,45 +1,12 @@
1
  import os
2
  import shutil
3
- import zipfile
4
 
5
  EN_US = os.getenv("LANG") != "zh_CN.UTF-8"
6
  TMP_DIR = "./__pycache__"
7
 
8
 
9
- def mk_dir(dir_path: str):
10
- if not os.path.exists(dir_path):
11
- os.makedirs(dir_path)
12
-
13
-
14
  def clean_dir(dir_path: str):
15
  if os.path.exists(dir_path):
16
  shutil.rmtree(dir_path)
17
 
18
  os.makedirs(dir_path)
19
-
20
-
21
- def unzip(zip_path: str, extract_to: str):
22
- mk_dir(extract_to)
23
- # 打开ZIP文件
24
- with zipfile.ZipFile(zip_path, "r") as zip_ref:
25
- # 解压文件
26
- zip_ref.extractall(extract_to)
27
-
28
-
29
- def compress(folder_path: str, zip_file: str):
30
- # 确保文件夹存在
31
- if not os.path.exists(folder_path):
32
- raise ValueError(f"错误: 文件夹 '{folder_path}' 不存在")
33
- # 打开 ZIP 文件,使用 'w' 模式表示写入
34
- with zipfile.ZipFile(zip_file, "w", zipfile.ZIP_DEFLATED) as zipf:
35
- # 遍历文件夹中的文件和子文件夹
36
- for root, _, files in os.walk(folder_path):
37
- for file in files:
38
- file_path = os.path.join(root, file)
39
- # 计算相对路径,保留文件夹的根目录
40
- relative_path = os.path.relpath(file_path, folder_path)
41
- zipf.write(
42
- file_path,
43
- arcname=os.path.join(os.path.basename(
44
- folder_path), relative_path),
45
- )
 
1
  import os
2
  import shutil
 
3
 
4
  EN_US = os.getenv("LANG") != "zh_CN.UTF-8"
5
  TMP_DIR = "./__pycache__"
6
 
7
 
 
 
 
 
 
8
  def clean_dir(dir_path: str):
9
  if os.path.exists(dir_path):
10
  shutil.rmtree(dir_path)
11
 
12
  os.makedirs(dir_path)